Coding Memo

Asynchronous Socket IO - IOCP 본문

Game Server (C++)

Asynchronous Socket IO - IOCP

minttea25 2022. 11. 29. 14:52

본 포스팅은 인프런에 등록되어 있는 Rockiss 님의 강의를 보고 간단하게 정리한 글입니다.


이전에 나왔던 Select, Overlapped(event), Overlapped(callback)들은 장점도 있었지만 단점도 많은 기능이었다.

 

일반 비동기 호출: 매우 보기 불편해보이는 중첩 무한 반복문

Select: FD_SET 당 최대 64개 한정, 매번 set을 만들어줘야함, 완전한 비동기 방식이 아님, 성능 좋지 않음

WSAEventSelect: 한번에 소켓 64개 제한

Overlapped(event): 소켓과 이벤트를 1:1로만 대응시킴, 한번에 소켓 64개 한정(overlapped)

Overlapped(callback): 매번 Alertable Wait 상태를 만들어야되는 시스템적 부담(OS에 의해 호출), 한번에 소켓 64개 한정(overlapped), APC가 쓰레드마다 있어 하나의 쓰레드에서 모든 루틴 처리 필요

 

(이렇게 보니까 단점들이 무수히 많다 ㅋㅋ)

 

 

위의 단점들을 해결할 수있는 IOCP 방식의 Multiplexing 방법이 있다!!!


IOCP 방식은 지금까지 나온 모델 중 가장 좋은 성능을 가지고 있으며 APC처럼 쓰레드마다 관리하는 것이 아닌 비동기 함수 호출을 하나의 쓰레드에서 전부 관리할 수 있는 방법이다. (APC 대신 Completion Port로 처리한다.)

비동기 함수(WSARecv 등)가 실행 완료가 되면 completion port에 알림 패킷(completion notification packet)을 보낸다.

여러개의 세션(연결)을 CP로 관리할 수 있다.

 

IOCP의 특징은 다음과 같다.

1. Overlapped 구조체 사용

2. IOCP 객체를 통해 HANDLE을 받아 사용 (GetQueuedCompletionStatus 함수)

3. 별도의 쓰레드(Worker Thread)에서 callback 함수 동작

4. key 값을 이용해 데이터에 접근

 

 

사용 방법

1. completion port (HANDLE, CP) 생성

2. callback을 처리할 쓰레드(WorkerThread) 생성

3. 소켓을 CP에 등록

4. 비동기 함수 호출

----------

5. WorkerThread에서 비동기 호출 실행


사용할 함수들

 

 

CreateIoCompletionPort

HANDLE WINAPI CreateIoCompletionPort(
  _In_     HANDLE    FileHandle,
  _In_opt_ HANDLE    ExistingCompletionPort,
  _In_     ULONG_PTR CompletionKey,
  _In_     DWORD     NumberOfConcurrentThreads
);

이게 뭔가 싶지만 사용방법만 알고 가자.

이 함수는 한가지 기능만 하는 함수가 아니다(...)

1. CP만 생성

2. 생성되어 있는 CP와 file handle(여기서는 socket) 연결

3. 1, 2 둘다 한번에

이렇게 3가지 기능을 할 수있는 함수이다.

 

그러면 CP 생성과, 소켓 등록할 시에 이 함수를 사용하면 된다.

 

CP 생성

핸들을 INVALID로 해주고, 연결 포트를 NULL, 이후 값은 0으로 해주면 된다.

HANDLE iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

 

file handle (소켓) 등록

handle 넣는 첫 번째 인자에 연결할 소켓을,

두번째로는 앞서 생성했던 cphandle을,

세번째로는 나중에 GetQueuedCompletionStatus로 가져올 때 사용할 key 값을 넣는다.

(마지막 인자는 동시에 실행할 쓰레드 개수인데, 0으로 설정하면 시스템이 가능한 쓰레드 개수를 알아서 넣어준다.)

(추가적으로, CP 생성에서와 같이 ExistingCompletionPort인자가 NULL로 들어오면 이 값은 무시된다.)

::CreateIoCompletionPort((HANDLE)clientSocket, iocpHandle, (ULONG_PTR)/*key*/, 0);

 

 

GetQueuedCompletionStatus

BOOL GetQueuedCompletionStatus(
  [in]  HANDLE       CompletionPort,
        LPDWORD      lpNumberOfBytesTransferred,
  [out] PULONG_PTR   lpCompletionKey,
  [out] LPOVERLAPPED *lpOverlapped,
  [in]  DWORD        dwMilliseconds
);

CP에서 overlapped 정보를 받아오는 함수이다.

HANDLE CompletionPort CP
LPDWORD lpNumberOfBytesTransferred 받거나(WSARecv) 보낸(WSAWend) 바이트 수
PULONG_PTR lpCompletionKey CP에서 가져올 값(overlapped)에 대한 Key
LPOVERLAPPED *lpOverlapped 가져올 값에 대한 포인터
DWORD dwMilliseconds timeout 시간

반환 값은 성공시 TRUE, 실패 시 FALSE이다.

 

특별한 거는 없어 보인다.

다만, 한가지 짚고 넘어가야 할 점은 우리는 이 함수를 메인 쓰레드에서 실행 시킬 것이 아니라는 점이다.

값을 가져오기 위해서는 WorkerThread에서 CP 값과 사전에 정해놓은 Key 값이 필요하다.

(Key 값에 대해 생각해보면, session에 대한 특정 값으로 생각될 수 있는데, 이는 session에 대한 고유값이나 연결된 클라이언트으 정보에 포함되어있는 식별 가능한 ID 정도로 생각하면 될 것 같다.) 


먼저 구조체와 enum 정의를 한다.

IO_TYPE를 따로 둔 이유는 비동기 함수의 종류를 CP를 통해 확인할 수가 없기 때문이다. (IOCP의 불편한 점이기도 하다.)

WSAOVERLAPPED가 있는데도 OverlappedEx 구조체를 따로 선언하여 활용하는 이유는 IO_TYPE과 같이 사용자 정의 값을 넣어주기 위함이다. (WSAOVERLAPPED를 첫번째 맴버로 두어서 캐스팅 가능하도록 하는 것이 중요 포인트이다!)

const int BUFSIZE = 1000;

enum IO_TYPE
{
	READ,
	WRTIE,
	ACCEPT,
	CONNECT
};

struct Session
{
	SOCKET socket;
	char recvBuffer[BUFSIZE] = {};
	int recvBytes = 0;
};

struct OverlappedEx
{
	WSAOVERLAPPED overlapped = {};
	int type = 0; // iotype
};

 

다음으로는 callback들을 비동기로 호출하기 위한 쓰레드에서 실행시킬 함수 정의이다.

void WorkerThreadMain(HANDLE iocpHandle)
{
	while (true)
	{
		DWORD bytesTransferred = 0;
		Session* session = nullptr;
		OverlappedEx* overlappedEx = nullptr;
		BOOL ret = ::GetQueuedCompletionStatus(iocpHandle, &bytesTransferred,
			(ULONG_PTR*)&session, (LPOVERLAPPED*)&overlappedEx, INFINITE);

		if (ret == FALSE || bytesTransferred == 0)
		{
			// TODO: 연결 끊김
			continue;
		}

		// 값이 일치하지 않는 경우 CRASH 발생 시키는 사용자 정의 매크로 함수
		// ASSERT_CRASH(overlappedEx->type == IO_TYPE::READ);

		cout << "Recv Data IOCP = " << bytesTransferred << endl;

		WSABUF wsaBuf;
		wsaBuf.buf = session->recvBuffer;
		wsaBuf.len = BUFSIZE;

		DWORD recvLen = 0;
		DWORD flags = 0;
		::WSARecv(session->socket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);	
	}
}

 

Main

(accept는 동기방식으로 처리하였다.)

 

WSARecv를 WorkerThread에서 한번 더 호출함으로써 깨어난 WorkerThread에서 같은 WSAResv를 계속 처리하도록 하기 위함이다. (메인에서가 아닌 WorkerThread에서 계속 처리)

int main()
{
	WSAData wsaData;
	if (::WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
		return 0;


	SOCKET listenSocket = ::socket(AF_INET, SOCK_STREAM, 0);
	if (listenSocket == INVALID_SOCKET)
	{
		return 0;
	}

	SOCKADDR_IN serverAddr;
	::memset(&serverAddr, 0, sizeof(serverAddr));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_addr.s_addr = ::htonl(INADDR_ANY);
	serverAddr.sin_port = ::htons(7777);

	if (::bind(listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
	{
		return 0;
	}

	if (::listen(listenSocket, SOMAXCONN) == SOCKET_ERROR)
	{
		return 0;
	}

	cout << "Accept" << endl;

	vector<Session*> sessionManager;

	// CP 생성
	HANDLE iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); // initialize

	// WorkerThreads
	vector<thread> threads;
	for (int i = 0; i < 5; i++)
	{
		threads.push_back(thread([=]() { WorkerThreadMain(iocpHandle); }));
	}

	// accept는 main thread에서 처리
	while (true)
	{
		SOCKADDR_IN clientAddr;
		int addrLen = sizeof(clientAddr);

		SOCKET clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
		if (clientSocket == INVALID_SOCKET)
		{
			return 0;
		}


		Session* session = new Session();
		session->socket = clientSocket;
		sessionManager.push_back(session);

		cout << "Client Connected !" << endl;

		::CreateIoCompletionPort((HANDLE)clientSocket, iocpHandle, /*key*/(ULONG_PTR)session, 0);

		WSABUF wsaBuf;
		wsaBuf.buf = session->recvBuffer;
		wsaBuf.len = BUFSIZE;

		OverlappedEx* overlappedEx = new OverlappedEx();

		DWORD recvLen = 0;
		DWORD flags = 0;
		// 최초 한번은 여기서 실행 (WorkerThreadMain 깨우기)
		::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);

	}

	for (int i = 0; i < 5; i++)
	{
		threads[i].join();
	}

	// 윈속 종료
	::WSACleanup();
}

위에서 한 가지 문제점이 발생할 수 있다!

 

메인쓰레드에서 WSARecv가 최초로 호출 되었고 만약 어떤 이유로 호출 시 넘겨주었던 session이 삭제되었을 때 (delete session) 깨어난 Worker쓰레드는 이 session 가지고 작업을 제대로 실행 할 수 있을까?

 

-> 당장 에러는 안날 수도 있지만 session이 delete 되었기 때문에 Worker쓰레드에서 session에 있는 socket 값이나 그외 값들을 확인해보면 이상한 값이 들어가 있는 것을 확인할 수 있다. 이후 실행에 문제가 생길 것은 당연하다.

 

-> Worker쓰레드에서 세션을 삭제하는 것은 상관 없지만, Worker쓰레드로 해당 비동기 작업이 실행되기 전에 세션이 삭제되면 안된다!

 

-> 이전에 사용하였던 xxnew와 xxdelete 함수를 사용하여 우리가 직접 정의한 allocator로 reference 카운트를 이용해 CRASH를 확인 할 수 있다. (메인 쓰레드에서 세션이 xxdelete 되었으면, Worker에서 이 값에 접근하려는 순간 CRASH가 날 것이다.)

 

-> 따라서 session이 Worker쓰레드에서 작업이 진행되기 전에 삭제되는 문제는 사전에 막을 필요가 있다!


https://learn.microsoft.com/en-us/windows/win32/fileio/createiocompletionport

 

CreateIoCompletionPort function (IoAPI.h) - Win32 apps

Creates an input/output (I/O) completion port and associates it with a specified file handle, or creates an I/O completion port that is not yet associated with a file handle, allowing association at a later time.

learn.microsoft.com

https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getqueuedcompletionstatus

 

GetQueuedCompletionStatus function (ioapiset.h) - Win32 apps

Attempts to dequeue an I/O completion packet from the specified I/O completion port.

learn.microsoft.com

 

'Game Server (C++)' 카테고리의 다른 글

Atomic, Lock, Critical Section  (1) 2023.05.28
Asynchronous Socket IO - Overlapped (event, callback)  (0) 2022.11.26
WSAEventSelect  (0) 2022.11.23
Socket IO - Select  (0) 2022.11.18
Non-blocking Socket  (0) 2022.11.18