이어서 소켓 통신에 대해 알아보기 전에, 우선 I/O 하면 자주 나오는
Synchronous/Asynchronous 와 Blocking/NonBlocking을 먼저 정리하고 넘어가도록 하자.
문서는 네이버 클라우드 플랫폼에서 작성한 게시글을 참고하였다!
Synchronous/Asynchronous 와 Blocking/NonBlocking
#Synchronous, 동기
I/O 요청 - 응답 작업이 일련의 순서를 따름. 작업의 순서가 보장됨.
작업 완료를 user space에서 판단.
#Asynchronous, 비동기
kernal에 I/O 작업을 요청해두고 다른 작업 처리가 가능하나, 작업의 순서는 보장되지 않음.
작업 완료를 kernal space에서 통보.
#Blocking, 블로킹
요청한 작업이 모두 완료될 때까지 기다림. 제어권이 요청한 작업에 있음.
#Non-Blocking, 논블로킹
작업 요청 이후 결과는 나중에 필요할 때 전달받음. 제어권은 여전히 작업을 요청한쪽에서 가지고 있음.
쉽게 생각하면, Sync/Async는 작업 순서에 관련된 사항이고, Blocking/Non-Blocking은 현재 실행에 대한 제어권과 관련된 사항이다.
I/O 모델 종류
위의 Sync/Async와 Blocking/Non-Blocking을 조합하면 아래 4개의 조합이 나타나게 된다.
기본적으로 앞서 알아본 일반적인 socket 통신은 Sync + Blocking이다. 서버 관점에서 알아보면 listen, accept 등은 모두 Blocking 함수이며, 위의 작업들은 listen -> accept의 순서로 동기적으로 이루어져야 한다.
또 앞서 Non-Blocking 소켓을 이용하였던 통신은 Sync + NonBlocking이다. 기본적으로 accept 함수가 논블로킹으로 작동하여 accept 함수의 결과값을 분석하여, 만약 아직 연결이 준비되지 않았다면 다른 작업을 하거나 다른 프로세스로 context switching이 일어나게 된다. 하지만 소켓 통신 자체는 accept가 일어난 뒤에야 recv, send를 할 수 있는 동기적인 작업이므로, 결국 accept가 일어나야 그 다음 작업들이 실행되므로 Synchronous 하다고 할 수 있다.
중요한 것은 이제 Async + Blocking으로 되어있는 I/O 멀티플렉싱(Multiplexing)이다.
I/O 멀티플렉싱(Multiplexing)
멀티플렉싱 - 다중화란, 말 그대로 하나를 여러 개처럼 보이게 동작한다는 뜻으로, I/O 관점에서 바라보면 한 프로세스가 여러 파일을 관리하는 기법이라고 볼 수 있다. server-client 관점에서 보면, 여러 클라이언트가 보낸 데이터를 하나의 서버에서 처리할 수 있도록 하는 방식을 의미한다.
프로세스에서 특정 파일(소켓)에 접근할 때는 파일 디스크럽터(FD)라는 추상적인 값을 통해 접근하는데, 이 FD들을 어떻게 감시하느냐에 따라 I/O Multiplexing의 방식이 달라진다. 우선 여기서는 Select, IOCP 등을 이용하여 알아보았다.
아래 그림은 Asynchronous Blocking I/O로 제시된 Select 방식의 구조를 나타낸 그림이다.
그림만 보면 이게 왜 Blocking인지 의아할 것이다. user가 kernel에게 Read 요청을 보내고 동시에 user는 kernel로 부터 현재 요청이 미완료(EWOULDBLOCK)라는 응답을 받고, Read 요청의 응답을 받을 때까지 기다린다.
실제로 이처럼 Read,Write 요청에 대한 시스템 콜이 Blocking되는 것이 아니라, Select와 같은 멀티플렉싱관련 시스템 콜에 대한 응답에 대해 Blocking된다. 구체적인 흐름은 아래에 설명을 보면서 이해해보도록 하자..
사설이지만, 사실 멀티플렉싱은 네트워크에서 Transport Layer(전송계층)을 공부하면서 들어볼 수 있는 단어인데, UDP, TCP에서 여러 프로세스들이 전송 계층으로 데이터를 보내면 헤더를 붙여서 알맞은 네트워크 계층으로 보내는 작업이었다...
Select 모델
Select는 NonBlocking 문제를 해결하기 위해 나온 모델로, 소켓 함수 호출이 성공할 수 있는 시점을 미리 알 수 있다.
Select 모델에서는 Socket set을 사용하는데 Socket set은 말그대로 소켓(FD)들의 집합(set)이다. 따라서 읽기/쓰기 하고자 하는 소켓들을 미리 Socket set에 등록해주어야 한다.
Socket set의 종류는 읽기, 쓰기, 예외이다.
소켓 셋 | 함수 호출 시점 |
읽기 셋(read-set) | - 접속한 클라이언트가 있으므로 accept() 함수를 호출할 수 있다. - 소켓 수신 버퍼에 도착한 데이터가 있으므로 recv(), recvfrom() 등의 함수를 호출해 데이터를 읽을 수 있다. - TCP 연결이 종료되었으므로 recv(), recvfrom() 등의 함수를 호출해 연결 종료를 감지할 수 있다. |
쓰기 셋(write-set) | - 소켓 송신 버퍼의 여유 공간이 충분하므로 send(), sendto() 등의 함수를 호출하여 데이터를 보낼 수 있다. |
예외 셋(exception-set) | - OOB(Out-Of-Band) 데이터가 도착했으므로 recv(), recvfrom() 등의 함수를 호출하여 OOB데이터를 받을 수 있다. |
<참고> OOB 데이터란 데이터 + 우선순위가 부여된 데이터로 긴급한 메신저나 프로그램에 제어 신호를 보낼 때 사용하는 방식.
소켓 셋을 사용하기 위한 매크로 함수
매크로 함수 | 기능 |
FD_ZERO(fd_set *set) | 셋을 비운다(초기화) |
FD_SET(SOCKET s, fd_set *set) | 셋에 소켓 s를 넣는다. |
FD_CLR(SOCKET s, fd_set *set) | 셋에 소켓 s를 제거한다. |
FD_ISSET(SOCKET s, fd_set *set) | 소켓 s가 셋에 들어 있으면 0이 아닌 값을 리턴한다. 그렇지 않으면 0을 리턴한다. |
select 모델에서는 소켓 셋에 등록한 소켓들 중에 적어도 하나의 소켓이 준비되면 함수가 리턴된다 → 이때 소켓 셋에는 입출력이 가능한 소켓만 남고, 나머지는 모두 알아서 제거된다.
Select의 흐름을 정리하면
- 소켓 셋을 비운다(초기화)
- 소켓 셋에 소켓을 넣는다. 이때 넣을 수 있는 최대 소켓의 갯수는 FD_SETSIZE(64)로 정의한다.
- select()함수가 호출되고 조건을 만족하거나 타임아웃이 되면 함수가 리턴된다(블로킹)
- 소켓 셋에 남아있는 모든 소켓(살아남은)에 대해 적절한 소켓 함수를 호출하여 처리해준다.
- 1~4번을 반복한다.
Select 모델 서버 예제 코드
아래는 간단하게 Select를 이용하여 Client로부터 데이터를 받는 서버를 구현한 것이다. 서버에서는 읽기 셋을, 클라이언트에서는 쓰기 셋만을 이용하여 구현해보았다.
// Select 모델 = (select 함수)
//
// socket set
// 1) 읽기[] 쓰기[] 예외[] -> 관찰 대상
// 2) select(readSet, writeSet, exceptSet); -> 관찰 시작
// 3) 적어도 하나의 소켓 준비되면 리턴 -> 낙오자는 알아서 제거
// 4) 남은 소켓 체크해서 진행
const int BUF_SIZE = 1000;
struct Session // 커스텀 구조체 선언
{
SOCKET socket = INVALID_SOCKET;
char recvBuffer[BUF_SIZE] = {};
int recvBytes = 0;
};
// listen 소켓 선언 + bind + listen 완료
vector<Session> sessions;
sessions.reserve(100);
// select 용 set 데이터
fd_set reads;
while (true)
{
//소켓 셋 초기화
FD_ZERO(&reads);
// reads 셋에 listenSocket 등록
FD_SET(listenSocket, &reads);
// 소켓 등록 +알파
for (Session& s : sessions)
{
FD_SET(s.socket, &reads);
}
// [옵션] 마지막 timeout 인자 설정 가능
int retVal = ::select(0, &reads, nullptr, nullptr, nullptr); // read_set만 사용. 나머지는 NULL로 처리
if (retVal == SOCKET_ERROR) break;
if (FD_ISSET(listenSocket, &reads))
{
SOCKADDR_IN clientAddr;
int addrLen = sizeof(clientAddr);
SOCKET clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
if (clientSocket != INVALID_SOCKET)
{
if (::WSAGetLastError() == WSAEWOULDBLOCK)
continue;
cout << "Client Connected!" << endl;
sessions.push_back(Session{ clientSocket });
}
}
// 나머지 소켓 체크
for (Session& s : sessions)
{
if (FD_ISSET(s.socket, &reads))
{
int recvLen = ::recv(s.socket, s.recvBuffer, BUF_SIZE, 0);
if (recvLen <= 0)
{
continue;
}
s.recvBytes = recvLen;
cout << "RecvData = " << s.recvBuffer << endl;
cout << "RecvData Len = " << s.recvBytes << endl;
}
}
this_thread::sleep_for(1s);
}
Select 모델 클라이언트 예제 코드
fd_set writes;
while (true)
{
FD_ZERO(&writes);
FD_SET(clientSocket, &writes);
int retVal = ::select(0, nullptr, &writes, nullptr, nullptr);
if (retVal == SOCKET_ERROR) break;
if (FD_ISSET(clientSocket, &writes))
{
char sendBuffer[50] = "Hello world!";
int sendSize = ::send(clientSocket, sendBuffer, sizeof(sendBuffer), 0);
// 패킷송신시 에러처리
if (sendSize == SOCKET_ERROR)
return 0;
}
this_thread::sleep_for(1s);
}
WSAEventSelect
Select에서는 결국 Select 함수를 호출할때 Blocking이 되는 것이 단점이었다. 이를 보완하기 위해 WSAEventSelect가 등장하였는데, WSAEventSelect 방식은 소켓과 관련된 네트워크 이벤트를 이벤트 객체를 이용하여 감지하고 이벤트와 소켓을 매칭하여 이벤트를 감지하면 소켓을 처리해주는 방법이다. 즉, 이벤트 등록을 비동기로 진행하고
이벤트 객체의 신호 상태를 통해 이벤트의 발생유뮤를 알 수 있다. 하지만 어떤 종류의 이벤트가 발생했는지 이것만으로는 알 수 없으므로 추가적인 처리를 해주어야 함.
관련 함수
필요 기능 관련 함수
이벤트 객체 생성과 제거 | WSACreateEvent(), WSACloseEvent() |
소켓과 이벤트 객체 짝짓기 | WSAEventSelect() |
이벤트 객체의 신호 상태 감지하기 | WSAWaitForMultipleEvents() |
구체적인 네트워크 이벤트 알아내기 | WSAEnumNetworkEvents() |
여기서 WSACreateEvent()를 통하여 만들어진 이벤트 객체는 항상 수동 리셋(manual-reset) 이벤트로 신호가 꺼진 비신호 상태로 시작한다.
네트워크 이벤트
네트워크 이벤트 | 의미 |
FD_ACCEPT | 접속한 클라이언트가 있다. |
FD_READ | 데이터 수신이 가능하다. |
FD_WRITE | 데이터 송신이 가능하다. |
FD_CLOSE | 상대가 접속을 종료했다. |
FD_CONNECT | 통신을 위한 연결 절차가 끝났다. |
FD_OOB | OOB(Out-Of-Band) 데이터가 도착했다. |
WSAEventSelect()함수의 유의할 점은
- WSAEventSelect()함수 호출시 해당 소켓은 자동으로 논블로킹 모드로 전환됨.
- 네트워크 이벤트 발생 시 적절한 소켓 함수를 호출하지 않으면, 다음 번에는 같은 네트워크 이벤트가 발생하지 않는다. 예를 들어 FD_READ 이벤트에 대응해 recv() 함수를 호출하지 않으면, 동일 소켓에 대한 FD_READ 이벤트는 다시 발생하지 않는다. 따라서 네트워크 이벤트가 발생하면 대응 함수를 호출해야 하며, 그렇지 않을 경우 응용 프로그램이 네트워크 이벤트 발생 사실을 기록해두고 나중에 대응 함수를 호출해야한다.
- accept() 함수가 리턴하는 소켓은 연결 대기 소켓과 동일한 속성을 갖게 된다. 연결 대기 소켓은 직접 데이터 송수신을 하지 않으므로 FD_READ, FD_WRITE 이벤트를 처리하지 않는다. 반면 accept() 함수가 리턴하는 소켓은 FD_READ, FD_WRITE 이벤트를 처리해야 하므로, 다시 WSAEventSelect() 함수를 호출해 관심 있는 이벤트를 등록해야 한다.
이벤트 대응함수
FD_ACCEPT | accept() |
FD_READ | recv(), recvfrom() |
FD_WRITE | send(), sendto() |
FD_CLOSE | 없음 |
FD_CONNECT | 없음 |
FD_OOB | recv(), recvfrom() |
WSAEventSelect 모델 서버 예제
모델의 실행흐름은 Event를 만들고 - 소켓과 이벤트를 등록하고 - 네트워크 이벤트를 확인하고 - 해당 네트워크 이벤트에 적절한 대응함수를 호출해주는 것이다.
vector<WSAEVENT> wsaEvents;
WSAEVENT listenEvent = ::WSACreateEvent();
wsaEvents.push_back(listenEvent);
sessions.push_back(Session{ listenSocket });
// 이벤트와 소켓 등록
if (::WSAEventSelect(listenSocket, listenEvent, FD_ACCEPT | FD_CLOSE) == SOCKET_ERROR) // ACCEPT, CLOSE 관심있는 이벤트 등록
{
return 0;
}
while (true)
{
// 이벤트 대기
int index = ::WSAWaitForMultipleEvents(wsaEvents.size(), &wsaEvents[0], FALSE, WSA_INFINITE, FALSE); // 하나의 이벤트라도 완료되면 index 리턴
if (index == WSA_WAIT_FAILED)
continue;
index -= WSA_WAIT_EVENT_0; // index에 시작 위치 반환
// 이벤트 조사 : WSAEnumNetworkEvents
WSANETWORKEVENTS networkEvents;
if (::WSAEnumNetworkEvents(sessions[index].socket, wsaEvents[index], &networkEvents) == SOCKET_ERROR)
continue;
if (networkEvents.lNetworkEvents & FD_ACCEPT) // accept 이벤트 일 시
{
//Error-Check
if (networkEvents.iErrorCode[FD_ACCEPT_BIT] != 0)
continue;
SOCKADDR_IN clientAddr;
int addrLen = sizeof(clientAddr);
SOCKET clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
if (clientSocket != INVALID_SOCKET)
{
cout << " Client Connected" << endl;
WSAEVENT clientEvent = ::WSACreateEvent();
wsaEvents.push_back(clientEvent);
sessions.push_back(Session{ clientSocket });
if (::WSAEventSelect(clientSocket, clientEvent, FD_READ | FD_WRITE | FD_CLOSE) == SOCKET_ERROR) return 0;
}
}
// Client Session 소켓 체크
if (networkEvents.lNetworkEvents & FD_READ)
{
if (networkEvents.iErrorCode[FD_READ_BIT] != 0)
continue;
Session& s = sessions[index];
int recvLen = ::recv(s.socket, s.recvBuffer, BUF_SIZE, 0);
if (recvLen == SOCKET_ERROR && ::WSAGetLastError() != WSAEWOULDBLOCK)
{
if (recvLen <= 0) continue;
}
s.recvBytes = recvLen;
cout << "RecvData = " << s.recvBuffer << endl;
cout << "RecvData Len = " << s.recvBytes << endl;
}
}
Overlapped I/O
WSAEventSelect 모델은 결국 socket 하나하나마다 I/O 처리를 해주어야 한다는 단점이 존재한다.
Overlapped I/O는 이름 그대로 I/O를 작업을 중첩하여 처리하는 비동기 입출력(Asynchronous NonBlock I/O)의 일종이다.
이름에서 중첩이란 의미는, 위에 타 방식들은 결국 비동기로 요청을 하고 요청이 완료되었음을 알게 되면 이를 다시 recv, send 함수를 통해 소켓 버퍼에 저장된 내용을 유저 버퍼로 받아오거나 유저 버퍼의 내용을 소켓 버퍼에 등록하는 과정이 필요했다. 하지만 Overlapped I/O는 입출력 함수(WSARecv, WSASend 등)를 통해 요청이 완료될 때 이미 등록된 유저 버퍼에 데이터가 들어있게 되거나 유저 버퍼의 데이터가 소켓 버퍼에 들어가 있게 된다. 이렇게 여러 I/O 요청들을 중첩해서 처리할 수 있다는 장점이 있다.
어플리케이션은 입출력 함수(WSARecv, WSASend 등)를 등록하고 완료 여부와 무관하게 다른 작업을 진행하고(Async + NonBlocking) 입출력 작업이 끝나면 운영체제가 어플리케이션에 이벤트나 콜백함수를 통해 알려주고 어플리케이션은 이때 다른 작업을 중단하고 입출력 결과를 처리해주는 방식이다.
Overlapped 처리순서
- Overlapped에 입출력 함수를 걸어놓음 → 언제 완료/실행될지는 모름
- 바로 Overlapped 함수가 성공했는지 확인 → 대부분의 경우 실패
- 성공했으면 결과 얻어서 처리
- 실패했으면 사유를 확인
Overlapped에서는 이벤트 객체 기반으로 처리하거나 콜백 함수 기반으로 처리해줄 수 있다.
Overlapped 서버 예제코드 - 이벤트 방식
아래 코드는 이벤트 방식으로 구현한 방법인데, 여전히 이벤트 발생 자체를 체크해야 되기 때문에 Async하게 동작하지만 성능이 확실히 좋은가? 에 대해서는 애매한 것 같다.
Overlapped I/O를 사용할 수 있는 함수인 WSARecv, WSASend 함수를 사용했다.
여기서 재밌는 방법이 쓰였는데 바로 Session 구조체를 선언할 때,
또 이벤트 핸들과 소켓을 등록할 때 WSAEventSelect에서는 WSAEventSelect 함수를 호출하여 연결해줬지만, overlapped 객체에 직접 등록해주면 된다.
struct Session
{
// 캐스팅을 위해 overlappeed을 가장 맨 앞으로 조정
WSAOVERLAPPED overlapped = {};
// ***
SOCKET socket = INVALID_SOCKET;
char recvBuffer[BUF_SIZE] = {};
int recvBytes = 0;
};
// 소켓 생성, bind, listen 완료
while (true)
{
SOCKADDR_IN clientAddr;
int addrLen = sizeof(clientAddr);
SOCKET clientSocket;
while (true)
{
clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
if (clientSocket != INVALID_SOCKET)
{
break;
}
if(::WSAGetLastError() == WSAEWOULDBLOCK)
continue;
return 0;
}
Session session = Session{ clientSocket };
WSAEVENT wsaEvent = WSACreateEvent();
session.overlapped.hEvent = wsaEvent; // overlapped에 event 핸들을 등록
while (true)
{
WSABUF wsaBuf; // 버퍼 + length를 가지고 있는 구조체
wsaBuf.buf = session.recvBuffer; // 데이터를 받고자 하는 버퍼등록
// 하지만 버퍼가 Overlapped 중에는 사라지지 않도록 관리해주어야함!
wsaBuf.len = BUF_SIZE; // 데이터의 최대 크기
DWORD recvLen = 0;
DWORD flags = 0;
// session.overlapped에 이벤트 핸들이 등록된 상태
if (::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &session.overlapped, nullptr) == SOCKET_ERROR)
{
// Recv 하자마자 대부분 SOCKET을 받지 못하므로 PENDING 체크
if (::WSAGetLastError() == WSA_IO_PENDING)
{
// TODO --- 이벤트 방식
// 다른일을 하다가
// 이벤트 신호를 체크
::WSAWaitForMultipleEvents(1, &wsaEvent, TRUE, WSA_INFINITE, FALSE);
// Overlapped 결과를 받아오기
::WSAGetOverlappedResult(session.socket, &session.overlapped, &recvLen, FALSE, &flags);
}
else
{
break;
}
}
cout << "Data Recv = " << session.recvBuffer << endl;
}
}
Overlapped 서버 예제코드 - 콜백함수처리
아래 코드는 요청의 결과를 콜백함수 방식으로 처리하는 방법이다. 요청이 완료되면 결과가 콜백함수를 통해 넘어오기 때문에 이벤트 발생 여부를 전부 확인해야 했던 이벤트 처리 방식보다 코드가 비교적 깔끔하다..
여기서 재밌는 방식이 사용되었는데, 커스텀 구조체인 Session에 맨 앞 멤버변수로 WSAOVERLAPPED 구조체를 넣어주고, 콜백함수에서 인자로 넘어온 overlapped를 다시 커스텀 구조체인 Session으로 캐스팅하여 사용하고 있다. 메모리 주소자체가 WSAOVERLAPPED가 맨 앞에 있기 때문에, Session 구조체의 메모리 주소가, 멤버 변수인 overlapped의 주소와 같게 되어 이런 방식을 사용할 수 있게 된 것이다.
이렇게 되면 Session 구조체 하나로 소켓과 해당 소켓에 대한 정보(버퍼, 크기 등..)을 저장하고 이를 Overlapped 요청에 대한 콜백함수에서 이런 정보들을 사용할 수 있다는 장점이 있다!
struct Session
{
// 캐스팅을 위해 overlappeed을 가장 맨 앞으로 조정
WSAOVERLAPPED overlapped = {};
// ***
SOCKET socket = INVALID_SOCKET;
char recvBuffer[BUF_SIZE] = {};
int recvBytes = 0;
};
// Overlapped 용 콜백함수
// overlapped - 결과가 담겨있음
void CALLBACK RecvCallback(DWORD error, DWORD recvLen, LPWSAOVERLAPPED overlapped, DWORD flags)
{
// TODO
cout << "Data Recv Len Callback = " << recvLen << endl;
// 데이터를 받기 위해 캐스팅
Session* session = (Session*)(overlapped); // 커스텀 구조체 Session으로 캐스팅가능 -> WSAOVERLAPPED를 맨 앞으로 넘겨주었기 때문에
cout << session->recvBuffer << endl;
}
while (true)
{
SOCKADDR_IN clientAddr;
int addrLen = sizeof(clientAddr);
SOCKET clientSocket;
while (true)
{
clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
if (clientSocket != INVALID_SOCKET)
{
break;
}
if(::WSAGetLastError() == WSAEWOULDBLOCK)
continue;
return 0;
}
// Overlapped 콜백 함수 방식
cout << "Client Connected" << endl;
Session session = Session{ clientSocket };
while (true)
{
WSABUF wsaBuf; // 주소 + length
wsaBuf.buf = session.recvBuffer;
wsaBuf.len = BUF_SIZE;
DWORD recvLen = 0;
DWORD flags = 0;
if (::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &session.overlapped, RecvCallback) == SOCKET_ERROR)
{
// Recv 하자마자 대부분 SOCKET을 받지 못하므로 PENDING 체크
if (::WSAGetLastError() == WSA_IO_PENDING)
{
// TODO -- 콜백 함수 방식
// 예시로 Alertable Wait를 사용
// Alertable Wait -> 코드가 sleep하긴 하지만 이벤트 발생하면 깨어나서 callback함수 호출 / 스레드 APC큐에 등록되어 있음
::SleepEx(INFINITE, TRUE);
//SleepEx가 끝나면(입출력이 완료되면) 바로 콜백함수로 넘어가게 됨
//하나의 스레드(메인스레드)만 현재 작동하고 있기 때문임
}
else
{
break;
}
}
cout << "Data Recv = " << session.recvBuffer << endl;
}
}
하지만 하나의 스레드마다 Overlapped를 처리해주기 때문에 일을 분배할때 적합하지 않고 WSARecv, WSASend 등을 통해 반복적으로 I/O 요청을 호출하는데 이런 요청에 대한 Context Switching이 빈번하게 발생하는 문제점이 있다.
IOCP(Input/Output Completion Port)
IOCP는 Overlapped가 확장된 버전으로 Completion Routine(완료 루틴)으로 I/O의 완료 통지를 받아보게 설계되었다. Completion Routine은 흔히 말하는 콜백함수의 역할로 WSASend, WSARecv 함수를 호출하여 I/O 작업이 완료된 시점에 호출이 된다. 또 이름에서 알 수 있듯이 입출력 완료 포트, 입출력의 완료를 담당할 포트를 지정해서 처리하겠다는 의미이다. 포트는 IP 포트의 의미라기 보다는 데이터 입출력을 담당하는 객체라고 생각하면 이해하기 쉽다.
입출력 함수가 완료된 통지를 큐에 등록하여 정해진 스레드(Worker Thread)에서 처리하도록 만들어줄 수 있다.
소켓의 수와 무관한 수의 thread와 IOCP 객체로 동작하여 접속량이 많아도 성능이 많이 떨어지지 않게되는 장점이 있다.
IOCP 처리순서
- CP 생성 → IOCP object를 생성하여 입출력 결과를 담을 큐를 생성(CreateIOCompletionPort)
- 소켓(핸들)을 해당 CP에 등록 (CreateIOCompletionPort, 단 인자가 다름)
- Overlapped를 통해 입출력 함수를 호출(WSARecv, WSASend, WSAAccept 등)
- Overlapped 완료시 해당 CP 큐에 담기게 되고
- 정해진 스레드에서 큐에서 결과를 받아 처리해줌(GetQueuedCompletionStatus)
IOCP 서버 예제 코드
조금 집중해서 봐야 될것은 소캣을 CP에 등록할 때, key값을 넘겨 주게 되는데 이때 key값으로 등록된 소켓에 대한 정보를 넣어주어야 결과를 받아 처리할때 GetQueuedCompletionStatus에서 소켓을 찾아 해당 소켓에 맞게 처리해줄 수가 있다. 해당 코드에서는 Session 구조체를 만들어서 Session 구조체의 주소값을 key값으로 넘겨주고 있다. 물론 key가 아니라 Overlapped 객체에 담아서 전달할 수도 있다. 어떻게 구현하냐에 따라 달라질 것이다..
struct Session
{
// 캐스팅을 위해 overlappeed을 가장 맨 앞으로 조정
WSAOVERLAPPED overlapped = {};
// ***
SOCKET socket = INVALID_SOCKET;
char recvBuffer[BUF_SIZE] = {};
int recvBytes = 0;
};
enum IO_TYPE
{
READ,
WRITE,
ACCEPT,
CONNECT,
};
struct OverlappedEx
{
WSAOVERLAPPED overlapped = {};
int32 type = 0;
// TODO
};
// WorkerThread에서 실행할 메인함수
void WorkerThreadMain(HANDLE iocpHandle)
{
while (true)
{
//TODO
DWORD bytesTransfered = 0;
Session* session = nullptr;
OverlappedEx* overlappedEx = nullptr;
// 가장 핵심이 되는 함수 - Queue에서 완료된 정보를 가져옴
// session -> 처음 CreateIoCompletionPort 호출에 등록한 key값이 저장됨 -> session의 주소값을 저장했기 때문에 session 포인터에 저장하면 session 이 저장됨
bool ret = ::GetQueuedCompletionStatus(iocpHandle, &bytesTransfered, (ULONG_PTR*)&session, (LPOVERLAPPED*)&overlappedEx, INFINITE);
if (ret == false || bytesTransfered == 0)
continue;
switch (overlappedEx->type)
{
case IO_TYPE::READ:
break;
}
cout << "Recv Data Len = " << bytesTransfered << endl;
cout << "Recv Data IOCP = " << session->recvBuffer << endl;
WSABUF wsaBuf;
wsaBuf.buf = session->recvBuffer;
wsaBuf.len = BUF_SIZE;
DWORD recvLen = 0;
DWORD flags = 0;
// 다음 통신을 위해 WSARecv에 다시 등록
::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL); // 메인에서 WSARecv 한번 호출해줌
}
}
// 소켓 생성, bind, listen 성공
// IOCP
// 스레드들의 WSARecv, WSASend의 결과를 큐에 넣어 저장 -> worker 스레드를 두어 이벤트가 완료된 큐에 있는 데이터들을 처리해줌
HANDLE iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); // CP(Completion Port)라는 큐를 생성
vector<Session*> sessionManager;
vector<thread> threads;
// Worker Thread 생성
for (int i = 0; i < 5; i++)
{
threads.push_back(thread([iocpHandle]()
{
WorkerThreadMain(iocpHandle);
}));
}
while (true)
{
SOCKADDR_IN clientAddr;
int addrLen = sizeof(clientAddr);
SOCKET clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
if (clientSocket == INVALID_SOCKET)
return 0;
// Main Thread에서 accept까지 처리
Session* session = new Session();
session->socket = clientSocket;
sessionManager.push_back(session);
cout << "Client Connected" << endl;
// 함수는 똑같지만 인자가 다름
// clientSocket을 CP에 등록 - iocpHandle 큐에 clientSocket을 등록
::CreateIoCompletionPort((HANDLE)clientSocket, iocpHandle, /*key*/(ULONG_PTR)session, 0); // key값으로 session의 주소값을 넣어줌
WSABUF wsaBuf;
wsaBuf.buf = session->recvBuffer;
wsaBuf.len = BUF_SIZE;
OverlappedEx* overlappedEx = new OverlappedEx(); // 기본 overlapped에 type을 추가함
overlappedEx->type = IO_TYPE::READ;
DWORD recvLen = 0;
DWORD flags = 0;
::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL); // 메인에서 WSARecv 한번 호출해줌
}
코드를 작성할 때 조심해야 하는 것은 overlapped 구조체나 session을 등록해주는데 포인터 관리를 제대로 해주어야 한다. 만약 스마트 포인터등을 사용한다면 iocp에 들어있을 때 해당 포인터들이 삭제되지 않도록 관리해주어야 한다.
정확한 함수에 대한 설명보다는 전체적인 흐름을 보기 위해 작성되었다 보니 코드 자체가 중요한 부분과 중복되지 않는 부분만 캡쳐해서 올려 수도코드 형식이 되었다..
전체적인 코드는 아래에 정리되어 있다.
https://github.com/ajdxjdrnfl/SocketServer/tree/main
GitHub - ajdxjdrnfl/SocketServer: Windows Socket Server programming using winsock/Select/WSAEventSelect/OverlappedIO/IOCP
Windows Socket Server programming using winsock/Select/WSAEventSelect/OverlappedIO/IOCP - ajdxjdrnfl/SocketServer
github.com
'서버' 카테고리의 다른 글
[C++] 메모리 풀(Memory Pool) 구현 (1) | 2024.09.24 |
---|---|
[서버구조분석] NodeJS 프레임워크 기본구조 분석( 부제 : 멀티스레드 vs 싱글스레드 ) (0) | 2024.04.11 |
[서버구조분석] Spring 프레임워크 기본구조 분석 (0) | 2024.03.26 |
락 프리(Lock-Free) 알고리즘 (4) | 2024.03.13 |
소켓 통신 프로그래밍 정리(1) (1) | 2024.03.05 |