C++에서 메모리 동적할당에 사용하는 malloc 함수나 new 연산자 같은 경우, 몇가지 문제가 존재한다. 다양한 메모리 사이즈들이 할당, 해제되면서 연속적인 메모리 공간이 부족해지는 메모리 파편화 문제나, 반복되는 할당+해제에 의한 성능저하를 생각해 볼 수 있다.
메모리 풀(Memory Pool)을 이용하면 이런 문제들을 어느정도 해결할 수 있다.
메모리 풀은 큰 사이즈의 메모리 블록을 미리 할당하여 malloc, new 연산자와 유사한 메모리 동적 할당을 가능하게 해준다. 큰 사이즈의, 연속적인 메모리 블록을 미리 할당하기 때문에 파편화를 어느 정도 해결할 수 있고, 사용한 메모리 공간을 해제하는 것이 아닌 재사용하는 방식이기 때문에 퍼포먼스 향상을 기대할 수 있다
고려사항
구현 방법은 정말 다양하다.
우선 메모리 풀의 목적을 생각해보면, 크게 3가지 정도로 나눠볼 수 있을 것 같다.
- 내부 파편화 문제 해결 - 요청한 메모리의 크기에 맞는(적어도 비슷한) 메모리를 반환해야 한다
- 외부 파편화 문제 해결 - 한번에 큰 크기의 연속적인 메모리를 할당, 해당 메모리를 나눠서 사용해야 함.
- 잦은 new, delete로 인한 퍼포먼스 저하 해결 - 메모리를 한번에 할당, 한번에 해제해야 함.
위 목적에 맞게 기능을 구현해야 한다.
1번은 미리 특정 사이즈의 메모리만을 생성하는 메모리풀을 여러개 생성해놓고, 해당 메모리풀안에서는 각 사이즈의 맞는 메모리들을 여러개 생성해놓으면 된다.
3번은 큰 메모리 블럭 하나를 먼저 할당받고, 해당 메모리 블럭을 메모리 풀의 블록 사이즈에 맞게 자르면 된다.
그런데 2번을 충족할려면 생각보다 까다로워진다.
메모리를 연속적으로 자르는 것은 어려운 일이 아니다. 하지만 처음 할당 후에는 연속된 순서로 할당을 하게 되지만, 반환은 순서대로 된다는 보장이 없다. 따라서 메모리간의 할당 순서, 즉, 메모리의 순서에 대해 정렬을 하고 있어야 한다는 것이다. 그래야 다음에 할당해 줄 메모리를 고를 수 있기 때문이다.
위 상황은 초기화 후 할당 전 상태라면, 아래 상황은 할당/해제를 반복하면서 메모리 순서가 정렬되지 않은 상태이다. Memory#1을 할당하고 Memory#2가 아닌, 다음 메모리인 Memory#3를 할당해야 한다. 따라서 다음에 할당할 메모리를 알기 위해서는 메모리 순서를 정렬하고 있어야 한다.
구현 + 성능분석
싱글스레드
우선 간단하게 먼저 구현을 해보기 위해 싱글쓰레드 상황에서 생각해보도록 하겠다.
순서 정렬에 방법은 여러가지가 있을 수 있을 것 같다.
- vector, queue와 같은 선형 자료구조에 저장할 수도 있고
- list 방식으로 각 메모리에 다음에 올 메모리를 저장하여 관리할 수도 있다.
기본적인 코드 구성은 다음과 같다.
constexpr int MAX_LOOP_NUM = 10000000;
void ProductByMemoryPool()
{
for (int i = 0; i < MAX_LOOP_NUM; i++)
{
A* a = reinterpret_cast<A*>(pool->Allocate());
pool->Delocate(a);
}
}
void ProductByMalloc()
{
for (int i = 0; i < MAX_LOOP_NUM; i++)
{
A* a = new A();
delete a;
}
}
int main(void)
{
vector<thread> v;
vector<thread> v2;
clock_t start = clock();
ProductByMemoryPool();
clock_t end = clock();
printf("pool : %lfms\n", (double)(end - start));
clock_t start2 = clock();
ProductByMalloc();
clock_t end2 = clock();
printf("malloc : %lfms\n", (double)(end2 - start2));
}
반복적으로 메모리를 할당/해제하는 구조이다. 물론 실제 상황에서는 반복적으로 할당/해제를 할 일은 많이 없어 실제 성능하고는 완전 똑같지 않겠지만, 전체적인 메모리 할당/해제의 성능차이를 알 수는 있을 것이다.
Vector, Queue를 사용
vector , queue에 대한 push와 pop에 대한 성능 저하가 심하다.. vector, queue에 넣고 빼는(push_back, pop_back) 시간이 너무 오래걸려서 오히려 그냥 malloc, free 하는 것보다 느리다..
Linked List 방식
메모리의 앞부분에 다음 메모리의 offset을 저장한다(단, 이 경우에는 메모리가 offset 데이터 크기보다는 커야한다)
MemoryPoolList::MemoryPoolList(uint32 blockSize, int32 blockCount) : _blockSize(blockSize), _maxBlockCount(blockCount)
{
_header = static_cast<BYTE*>(malloc(_blockSize * _maxBlockCount));
BYTE* p = _header;
for (int32 i = 0; i < _maxBlockCount; i++)
{
// 메모리 앞부분에 다음 메모리 offset을 저장
*reinterpret_cast<int*>(p) = i + 1;
p += _blockSize;
}
*reinterpret_cast<int*>(p) = -1;
_blockCount = _maxBlockCount;
}
MemoryPoolList::~MemoryPoolList()
{
free(_header);
}
void* MemoryPoolList::Allocate()
{
{
if (_blockCount.load() == 0)
return nullptr;
}
BYTE* blockToAllocate = nullptr;
{
// _currentBlockID - 할당해야 하는 메모리의 Offset
blockToAllocate = _header + _currentBlockID * _blockSize;
_currentBlockID = *reinterpret_cast<int*>(blockToAllocate);
_blockCount.fetch_add(-1);
}
return (void*)blockToAllocate;
}
void MemoryPoolList::Delocate(void* blockToDelocate)
{
// 잘못된 메모리 할당
if (blockToDelocate < _header || blockToDelocate >= _header + _maxBlockCount * _blockSize)
return;
if (blockToDelocate == nullptr)
return;
{
if (_blockCount.load() >= _maxBlockCount)
{
return;
}
}
{
// 현재 메모리 위치의 offset 구하기
int offset = static_cast<int>((reinterpret_cast<BYTE*>(blockToDelocate) - _header)) / static_cast<int>(_blockSize);
// 반납하는 메모리 앞에 다음 메모리 offset 저장
*reinterpret_cast<int*>(blockToDelocate) = _currentBlockID;
_currentBlockID = offset;
_blockCount.fetch_add(1);
}
}
확실히 vector,queue를 이용하는 방식보다 매우 빠르다.
멀티쓰레드
그러면 멀티쓰레드 환경에서는 어떤 식으로 구현 해야할까? 멀티쓰레드 환경에서 여러개의 쓰레드에서 메모리 할당/반환을 시도했을때, 메모리 정렬을 어떤 식으로 할지 고민해보아야 한다.
- mutex Lock으로 관리한다.
우선 구현이 매우 간편하다는 장점은 있다. 하지만 Lock은 기본적으로 atomic 연산을 요구한다. atomic 연산은 연산 자체가 무겁기도 하므로 다른 방법을 사용해보도록 하자.
- LockFree 방식으로 구현한다.
메모리 풀에서 가장 먼저 할당할 메모리 offset(_currentBlockID)을 atomic 연산으로 관리한다. 대신, _currentBlockID를 CAS(Compare And Swap) 연산을 이용하여 LockFree로 관리한다.
할당 시에는 _currentBlockID에 현재 할당할 메모리의 다음 메모리 offset을 저장하고, 해제 시에는 _currentBlockID에 현재 해제할 메모리의 index를 저장하도록 한다.
MemoryPoolListMT::MemoryPoolListMT(uint32 blockSize, int32 blockCount) : _blockSize(blockSize), _maxBlockCount(blockCount)
{
_header = static_cast<BYTE*>(malloc(_blockSize * _maxBlockCount));
BYTE* p = _header;
for (int32 i = 0; i < _maxBlockCount; i++)
{
*reinterpret_cast<int*>(p) = i + 1;
p += _blockSize;
}
*reinterpret_cast<int*>(p) = -1;
_blockCount = _maxBlockCount;
}
MemoryPoolListMT::~MemoryPoolListMT()
{
free(_header);
delete _root;
}
void* MemoryPoolListMT::Allocate()
{
{
if (_blockCount.load() == 0)
return nullptr;
}
BYTE* blockToAllocate = nullptr;
int idToAllocate;
int next;
// CAS 연산을 이용하여 _currentBlockID에 다음 메모리 offset 저장
do
{
idToAllocate = _currentBlockID.load();
if (idToAllocate == -1)
{
return nullptr;
}
next = *reinterpret_cast<int*>(_header + _blockSize * idToAllocate);
} while (!_currentBlockID.compare_exchange_strong(idToAllocate, next));
return reinterpret_cast<void*>(_header + idToAllocate * _blockSize);
}
void MemoryPoolListMT::Delocate(void* blockToDelocate)
{
// 잘못된 메모리 할당
if (blockToDelocate < _header || blockToDelocate >= _header + _maxBlockCount * _blockSize)
return;
int blockIndex = (static_cast<int>(reinterpret_cast<BYTE*>(blockToDelocate) - _header)) / _blockSize;
int next;
// CAS 연산을 이용하여 _currentBlockID에 해제할 메모리의 offset을 저장
do
{
next = _currentBlockID.load();
*reinterpret_cast<int*>(blockToDelocate) = next;
} while (!_currentBlockID.compare_exchange_strong(next, blockIndex));
}
LockFree 방식이 확실히 Lock에 비해 빠르다. 물론 실전에서는 메모리풀을 사용하면 맞는 크기의 메모리풀을 찾는 등의 오버헤드가 발생할 것이다. 그런 것을 생각해도 훨씬 빠르다는 걸 알 수 있다.
멀티쓰레드 방식의 단일 메모리 풀을 구현해 보았는데, 다음에는 메모리풀을 관리하는 클래스를 만들어 다양한 블록 사이즈를 할당하도록 구현해보아야겠다.
'서버' 카테고리의 다른 글
[서버구조분석] NodeJS 프레임워크 기본구조 분석( 부제 : 멀티스레드 vs 싱글스레드 ) (0) | 2024.04.11 |
---|---|
[서버구조분석] Spring 프레임워크 기본구조 분석 (0) | 2024.03.26 |
락 프리(Lock-Free) 알고리즘 (4) | 2024.03.13 |
소켓 통신 프로그래밍(2) (1) | 2024.03.06 |
소켓 통신 프로그래밍 정리(1) (1) | 2024.03.05 |