Coding Memo

Asynchronous Socket IO - Overlapped (event, callback) 본문

Game Server (C++)

Asynchronous Socket IO - Overlapped (event, callback)

minttea25 2022. 11. 26. 22:46

본 포스팅은 인프런에 등록되어 있는 Rockiss 님의 강의를 보고 간단하게 정리한 글입니다.


지난 번에는 WSASelect 모델을 이용해 Socket IO를 처리했었다.

Select와 WSASelect는 결국 모두 Select를 활용하는 방법으로 동기 I/O 느낌이 강하게 든다.

 

이번에는 Overlapped에서 Event와 Callback 함수를 활용한 진짜 소켓 비동기 I/O 모델을 사용해볼 것이다.

 

Overlapped I/O는 Asynchronous(비동기)처리의 Non-blocking 방식의 모델이다. 비동기+논블로킹의 방식은 callback 방식으로 호출을 하면서 callback의 형태로 다시 결과를 받는 방식이다. (Asynchronous) 물론, 이 때 다른 일을 처리할 수 있다.(non-blocking)

 

일단 Overlapped 구조체는 다음과 같이 정의 되어 있다.

typedef struct _OVERLAPPED {
    ULONG_PTR Internal;
    ULONG_PTR InternalHigh;
    union {
        struct {
            DWORD Offset;
            DWORD OffsetHigh;
        } DUMMYSTRUCTNAME;
        PVOID Pointer;
    } DUMMYUNIONNAME;

    HANDLE  hEvent;
} OVERLAPPED, *LPOVERLAPPED;

이 부분에서 확인해야 할 부분은 hEvent 뿐이다. 사용할 Event를 넣어줄 수 있다.

 

Overlapped를 이용하여 2가지 방식으로 비동기 I/O를 처리할 수 있다.

첫 번째는 Event를 이용하는 방법이고 두 번째는 callback을 이용하는 방법이다.

 

기본적인 Overlapped에 대한 2가지 방법의 공통된 사용방법은 다음과 같다.

1. 비동기 함수(WSARecv, WSASend...) 호출

2. 1번에서 호출했던 함수가 성공했는지 확인 (pending 여부 확인 포함)

 

(자세한 내용은 아래에...)

 

이번에는 에코서버가 아니고 클라이언트에서는 send, 서버에서는 recv만 한다.

 

먼저 사용할 비동기 입출력 함수 (WSARecv, WSASend)를 확인하고 가자.


WSARecv

int WSAAPI WSARecv(
  [in]      SOCKET                             s,
  [in, out] LPWSABUF                           lpBuffers,
  [in]      DWORD                              dwBufferCount,
  [out]     LPDWORD                            lpNumberOfBytesRecvd,
  [in, out] LPDWORD                            lpFlags,
  [in]      LPWSAOVERLAPPED                    lpOverlapped,
  [in]      LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
SOCKET s Asynchronous IO socket
LPWSABUF lpBuffers WSABuf 배열의 시작 주소
DWORD dwBufferCount lpBuffer의 개수 (WSABuf 배열 길이)
LPDWORD lpNumberOfBytesRecvd Recv한 바이트 수 [Out]
LPDWORD lpFlags 상세 옵션 (0)
LPWSAOVERLAPPED lpOverlapped WSAOVERLAPPED 구조체
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine Recv 완료시 OS가 호출할 callback 함수

 

 

WSASend

int WSAAPI WSASend(
  [in]  SOCKET                             s,
  [in]  LPWSABUF                           lpBuffers,
  [in]  DWORD                              dwBufferCount,
  [out] LPDWORD                            lpNumberOfBytesSent,
  [in]  DWORD                              dwFlags,
  [in]  LPWSAOVERLAPPED                    lpOverlapped,
  [in]  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
SOCKET s Asynchronous IO socket
LPWSABUF lpBuffers WSABuf 배열의 시작 주소
DWORD dwBufferCount lpBuffer의 개수 (WSABuf 배열 길이)
LPDWORD lpNumberOfBytesSent Send한 바이트 수 [Out]
LPDWORD lpFlags 상세 옵션 (0)
LPWSAOVERLAPPED lpOverlapped WSAOVERLAPPED 구조체
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine Send완료시 OS가 호출할 callback 함수

 

참고: 단순 Buffer(char[])가 아닌 *LPWSABUF(=WSABUF)를 사용하는 이유?

WSABUF 구조체를 보면 다음과 같이 되어있다.

typedef struct _WSABUF {
  ULONG len;
  CHAR  *buf;
} WSABUF, *LPWSABUF;

Buf와 길이를 포함한 구조체인데 왜 길이까지 필요할까?

-> Scatter-Gather를 위해서이다. Scatter-Gather는 따로 떨어져 있는 버퍼들을 한번에 모으는 기능이다. 버퍼의 주소값과 그 버퍼의 길이를 가지고 있으면 이 WSABUF가 여러개로 배열처럼 되어도 주소값과 버퍼의 길이를 알고 있기 때문에 하나로 쭉 나열해 합칠 수있다. 이 때문에 dwBufferCount를 인자로 넣어주는 것이다.

 

Overlapped의 hEvent를 활용하면 Event기반의 Overlapped고, callback 함수쪽을 활용하면 callback 기반의 Overlapped이다. (따라서 Event기반의 Overlapped는 callback 함수에 nullptr을 넣어주면 된다.)

 

주의사항으로, 비동기 함수들이 바로 실행이 되지 않을 수도 있다는 점을 감안하여, 결과값을 받아올 주소에 절대로 새로운 값을 할당하거나 값을 바꾸지 말자!

 

참고: WSASend, WSARecv외에도 AcceptEx와 ConnectEx로 비동기 처리를 할 수 있는데 이는 사전 작업이 필요하다고 한다. 여기서는 WSASend와 WSARecv만을 사용해서 데이터 송수신만 비동기로 구현하겠다. 


Event 기반 Overlapped

 

1. 비동기 모드의 소켓과 Signal을 받기위한 WSAEvent 객체 생성

2. 비동기 함수 호출 (위에서의 1번)

3. 바로 완료가 되지 않았다면 (pending 상태일 경우) signaled 상태 확인 (WSAWaitForMultipleEvents 사용)

4. WSAGetOverlappedResult로 비동기 입출력 함수 결과 확인 및 데이터 처리

 

비동기 입출력 함수가 바로 실행되었는지 확인한다.

그렇지 않다면 비동기 함수의 return 값을 확인하게 되는데, 에러가 발생했을 경우, SOCKET_ERROR를 내밷는다. 그러나 이 에러는 진짜 에러일 수도 있고 가짜 에러일 수도 있다. 무슨 뜻이냐면, 만약 어떤 이유로 (send또는 recv할 버퍼의 공간이 없다거나, 큐가 많이 밀려있거나 등등...) 바로 실행되지 않았을 경우에도 SOCKET_ERROR를 내밷을 수 있다는 뜻이다. 이를 pending 상태라고 하는데, 바로 실행이 되지 않아서 에러가 났다는 뜻일 뿐, 이후에 실행이 될 수 있다. pending 상태 확인은 WSAGetLastError() 함수를 이용해 WSA_IO_PENDING의 경우를 확인한다. (당연하지만 WSA_IO_PENDING 에러가 아닐 경우 진짜 에러다!)

pending 상태였을 경우 WSAWaitForMultipleEvents 함수로 해당 이벤트가 signaled 상태인지 확인을 하고 (timeout동안) 완료가 되었다면 WSAGetOverlappedResult로 결과를 얻어온다.

 

DWORD WSAAPI WSAWaitForMultipleEvents(
  [in] DWORD          cEvents,
  [in] const WSAEVENT *lphEvents,
  [in] BOOL           fWaitAll,
  [in] DWORD          dwTimeout,
  [in] BOOL           fAlertable
);

WSAWaitForMultipleEvents는 저번 글에 있으니까 간단하게만 언급한다. 이벤트(WSAEvent)의 signal 상태를 확인하고 signaled가 된 이벤트의 index를 반환하는 함수이다. timeout만큼 blocking 되는 함수이다. 두 번째 인자로 lphEvent를 넘겨줄 것인데 이번에는 Overlapped 구조체에 있는 hEvent를 넘겨주면 된다.

 

BOOL WSAAPI WSAGetOverlappedResult(
  [in]  SOCKET          s,
  [in]  LPWSAOVERLAPPED lpOverlapped,
  [out] LPDWORD         lpcbTransfer,
  [in]  BOOL            fWait,
  [out] LPDWORD         lpdwFlags
);

WSAGetOverlappedResult는 말 그대로 Overlapped에 대한 결과를 가져오는 함수이다.

SOCKET s 비동기 모드의 I/O Socket
LPWASOVERLAPPED lpOverlapped Overlapped 구조체
LPDWORD lpcbTransfer 결과에 대한 바이트 수 (송수신 성공 바이트 수) [Out]
BOOL fWait 비동기 입출력 작업이 끝날대까지 대기할지 여부
LPEWORD lpdwFlags 비동기 입출력 관련 부가 정보 (0)

제대로 실행이 성공했다면 TRUE를 반환한다.


Full Codes

Server (Receiver)

더보기
const int BUFSIZE = 1000;

struct Session
{
	SOCKET socket;
	char recvBuffer[BUFSIZE] = {};
	int recvBytes = 0;
	WSAOVERLAPPED overlapped = {};
};

int main()
{
	WSAData wsaData;
	if (::WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
		return 0;
        
	SOCKET listenSocket = ::socket(AF_INET, SOCK_STREAM, 0);
	if (listenSocket == INVALID_SOCKET)
	{
		return 0;
	}

	// non-blocking 사용
	u_long on = 1;
	if (::ioctlsocket(listenSocket, FIONBIO, &on) == INVALID_SOCKET)
	{
		return 0;
	}

	SOCKADDR_IN serverAddr;
	::memset(&serverAddr, 0, sizeof(serverAddr));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_addr.s_addr = ::htonl(INADDR_ANY);
	serverAddr.sin_port = ::htons(7777);

	if (::bind(listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
	{
		return 0;
	}

	if (::listen(listenSocket, SOMAXCONN) == SOCKET_ERROR)
	{
		return 0;
	}

	cout << "Accept" << endl;

	while (true)
	{
		SOCKET clientSocket;
		SOCKADDR_IN clientAddr;
		int32 addrLen = sizeof(clientAddr);
		while (true)
		{
			clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen); // non-blocking
			if (clientSocket != INVALID_SOCKET)
			{
				break;
			}
			if (::WSAGetLastError() == WSAEWOULDBLOCK)
			{
				continue;
			}

			// error
			return 0;
		}

		Session session = Session{ clientSocket };
		WSAEVENT wsaEvent = ::WSACreateEvent();
		session.overlapped.hEvent = wsaEvent;

		cout << "Client Connected !" << endl;

		while (true)
		{
			WSABUF wsaBuf;
			wsaBuf.buf = session.recvBuffer;
			wsaBuf.len = BUFSIZE;

			DWORD recvLen = 0;
			DWORD flags = 0;
            
			// 비동기 호출
			if(::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &session.overlapped, nullptr) == SOCKET_ERROR)
			{
				// 진짜 에러인지 확인 필요
				// pending 된 상태였을 경우 -> 실제 에러는 아님
				if (::WSAGetLastError() == WSA_IO_PENDING)
				{
					// 완료될 때 까지 대기 (blocking)
					::WSAWaitForMultipleEvents(1, &wsaEvent, TRUE, WSA_INFINITE, FALSE);
					// 결과 얻어오기
					::WSAGetOverlappedResult(session.socket, &session.overlapped, &recvLen, FALSE, &flags);
				}
				else
				{
					// TODO : Error
					break;
				}
				
			}

			cout << "Data Recv Len = " << recvLen << endl;
		}

		::closesocket(session.socket);
		::WSACloseEvent(wsaEvent);
	}

	// 윈속 종료
	::WSACleanup();
}

 

Client (Sender)

더보기
int main()
{
	this_thread::sleep_for(1s);

	WSAData wsaData;
	if (::WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
	{
		return 0;
	}

	SOCKET clientSocket = ::socket(AF_INET, SOCK_STREAM, 0);
	if (clientSocket == INVALID_SOCKET)
	{
		return 0;
	}

	// non-blocking 사용
	u_long on = 1;
	if (::ioctlsocket(clientSocket, FIONBIO, &on) == INVALID_SOCKET)
	{
		return 0;
	}

	SOCKADDR_IN serverAddr;
	::memset(&serverAddr, 0, sizeof(serverAddr));
	serverAddr.sin_family = AF_INET;
	::inet_pton(AF_INET, "127.0.0.1", &serverAddr.sin_addr);
	serverAddr.sin_port = ::htons(7777);

	// Connect
	while (true)
	{
		if (::connect(clientSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
		{
			// non-blocking
			if (WSAGetLastError() == SOCKET_ERROR)
			{
				return 0;
			}

			// 이미 연결된 상태라면 break
			if (::WSAGetLastError() == WSAEISCONN)
			{
				break;
			}

			// Error
			break;
		}
	}

	cout << "Connected to Server!" << endl;

	char sendBuffer[100] = "Hello World";
	WSAEVENT wsaEvent = ::WSACreateEvent();
	WSAOVERLAPPED overlapped = {};
	overlapped.hEvent = wsaEvent;

	// send
	while (true)
	{
		WSABUF wsaBuf;
		wsaBuf.buf = sendBuffer;
		wsaBuf.len = 100;

		DWORD sendLen = 0;
		DWORD flags = 0;

		if (::WSASend(clientSocket, &wsaBuf, 1, &sendLen, flags, &overlapped, nullptr) == SOCKET_ERROR)
		{
			// 진짜 에러인지 확인 필요
			// pending 된 상태였을 경우 -> 실제 에러는 아님
			if (::WSAGetLastError() == WSA_IO_PENDING)
			{
				// 완료될 때 까지 대기 (blocking)
				::WSAWaitForMultipleEvents(1, &wsaEvent, TRUE, WSA_INFINITE, FALSE);
				// 결과 얻어오기
				::WSAGetOverlappedResult(clientSocket, &overlapped, &sendLen, FALSE, &flags);
			}
			else
			{
				// TODO : Error
				break;
			}
		}

		cout << "Send Data ! Len = " << sizeof(sendBuffer) << endl;

		this_thread::sleep_for(1s);
	}

	// 소켓 리소스 반환
	::closesocket(clientSocket);

	// 윈속 종료
	::WSACleanup();
}

Callback (Completion Routine) 기반 

(Event 기반의 방법보다 좀 더 알아야 할게 많을지도?)

 

1. 비동기 모드의 I/O 소켓 생성 및 Completion Routine 함수 준비

2. 비동기 함수(WSARecv, WSASend...) 호출

3. 바로 완료가 되지 않고, pending된 상태라면 비동기 입출력 함수를 호출한 쓰레드를 Alertable Wait 상태로 만들기 (이후 callback 함수 호출됨)

3-1. 비동기 I/O 완료 후, OS가 완료 루틴을 호출하여 비동기 함수 호출 시 지정한 함수 실행

3-2. 완료 루틴 호출이 모두 끝나면, 그 쓰레드는 Aleartable 해제

 

비동기 함수 호출 시에 overlapped 구조체를 넣고, 바로 실행이 완료되었는지 확인을 한다.

여기서 중요한 것은 비동기 함수가 바로 실행되어 결과값을 가지고 있을 수도 있고 아직 실행이 끝나지 않아서 결과값이 없을 수도 있다는 것이다.

바로 실행이 완료되었다면 크게 상관없다.

그렇지 않다면 비동기 함수의 return 값을 확인하게 되는데, 에러가 발생했을 경우, SOCKET_ERROR를 내밷는다. 그러나 이 에러는 진짜 에러일 수도 있고 가짜 에러일 수도 있다. 무슨 뜻이냐면, 만약 어떤 이유로 (send또는 recv할 버퍼의 공간이 없다거나, 큐가 많이 밀려있거나 등등...) 바로 실행되지 않았을 경우에도 SOCKET_ERROR를 내밷을 수 있다는 뜻이다. 이를 pending 상태라고 하는데, 바로 실행이 되지 않아서 에러가 났다는 뜻일 뿐, 이후에 실행이 될 수 있다. pending 상태 확인은 WSAGetLastError() 함수를 이용해 WSA_IO_PENDING의 경우를 확인한다. (당연하지만 WSA_IO_PENDING 에러가 아닐 경우 진짜 에러다!) (Event 내용과 동일하다...)

pending 상태가 확인될 경우 Alertable Wait 상태로 만들고 APC 큐에있는 함수들을 실행 시키게 한다. 이 때 비동기 함수 호출 시 걸어두었던 함수가 비동기로 호출이 되는 것이다.

 

 

여기서 잠깐!

 

* APC (Asynchronous Procedure Call), 비동기 프로시저 호출 이란?

특정 스레드의 context에서 비동기적으로 실행되는 함수로, 이 함수들(Completion Routine)은 APC 큐에서 관리되고 있으며, 스레드마다 각각 가지고 있다.

또한 시스템에서 생성된 APC는 kernel-mode APC라고 하고 어플리케이션에서 생성된 APC는 user-mode APC라고 한다. user-mode APC를 실행하기 위해서는 그 스레드가 Alertable 상태여야 한다.

(참고로 queue에 있는 함수들이 모두 호출된 이후에 Alertable 상태를 벗어난다고 한다.)

 

즉, Alertable Wait 상태로 만든다는 것은 Alertable 상태를 만들기 위한 기다림 정도로 생각하면 된다. 기다리고 있다가 비동기 함수가 완료되어 Wait가 끝났을때, Alertable 상태가 되어 APC큐에 있는 함수들을 실행시키는 것이다.

 

그러면 어떻게 Alertable Wait 상태로 만들까?

 

SleepEx, SignalObjectAndWait...등의 함수와 지난번에 잠깐 언급되었던 WaitForMultipleEvents 등으로 할수 있다.

그 중 SleepEx를 살펴보자면, (WaitForMultipleEvents는 마지막인자(fAlertable)를 TRUE로 하면 된다.)

WaitForMultipleEvents에서의 인자와 마찬가지로 timeout 시간과 fAlertable 여부를 인자로 넣어주면 된다. (Alertable 상태로 할것이므로 TRUE를 넣어주면 된다.)

DWORD SleepEx(
  [in] DWORD dwMilliseconds,
  [in] BOOL  bAlertable
);

 

 

마지막으로, APC에 넣어줄 함수는 어떻게 작성해야 할까?

WSARecv나 WSASend 함수의 마지막인자로 callback 함수를 넣어주게 되는데 이 함수의 타입이 LPWSAOVERLAPPED_COMPLETION_ROUTINE 이라는 타입이다.

LPWSAOVERLAPPED_COMPLETION_ROUTINE LpwsaoverlappedCompletionRoutine;

void LpwsaoverlappedCompletionRoutine(
  DWORD dwError,
  DWORD cbTransferred,
  LPWSAOVERLAPPED lpOverlapped,
  DWORD dwFlags
)
{...}

 

이 형식 그대로 함수를 만들어주면 된다.

예시)

void CALLBACK RecvCallback(DWORD error, DWORD recvLen, LPWSAOVERLAPPED overlapped, DWORD flags)
{
	// TODO
}

(Client는 Event 기반의 Sender로 그대로 실행)

 

Server

더보기
const int BUFSIZE = 1000;

struct Session
{
	WSAOVERLAPPED overlapped = {}; // Session 과 overlapped 주소가 같음!! -> casting 가능
	SOCKET socket;
	char recvBuffer[BUFSIZE] = {};
	int recvBytes = 0;
};

void CALLBACK RecvCallback(DWORD error, DWORD recvLen, LPWSAOVERLAPPED overlapped, DWORD flags)
{
	cout << "Data Recv Len Callback = " << recvLen << endl;
	// If it is for echo-server, WSASend()
	Session* session = (Session*)overlapped; // 값 확인해보기
}

int main()
{
	WSAData wsaData;
	if (::WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
		return 0;

	SOCKET listenSocket = ::socket(AF_INET, SOCK_STREAM, 0);
	if (listenSocket == INVALID_SOCKET)
	{
		return 0;
	}

	u_long on = 1;
	if (::ioctlsocket(listenSocket, FIONBIO, &on) == INVALID_SOCKET)
	{
		return 0;
	}

	SOCKADDR_IN serverAddr;
	::memset(&serverAddr, 0, sizeof(serverAddr));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_addr.s_addr = ::htonl(INADDR_ANY);
	serverAddr.sin_port = ::htons(7777);

	if (::bind(listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
	{
		return 0;
	}

	if (::listen(listenSocket, SOMAXCONN) == SOCKET_ERROR)
	{
		return 0;
	}

	cout << "Accept" << endl;

	while (true)
	{
		SOCKET clientSocket;
		SOCKADDR_IN clientAddr;
		int32 addrLen = sizeof(clientAddr);
		while (true)
		{
			clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen); // non-blocking
			if (clientSocket != INVALID_SOCKET)
			{
				break;
			}
			if (::WSAGetLastError() == WSAEWOULDBLOCK)
			{
				continue;
			}

			// error
			return 0;
		}

		Session session = Session{ clientSocket };
		WSAEVENT wsaEvent = ::WSACreateEvent();

		cout << "Client Connected !" << endl;

		while (true)
		{
			WSABUF wsaBuf;
			wsaBuf.buf = session.recvBuffer;
			wsaBuf.len = BUFSIZE;

			DWORD recvLen = 0;
			DWORD flags = 0;

			if(::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &session.overlapped, RecvCallback) == SOCKET_ERROR)
			{
				if (::WSAGetLastError() == WSA_IO_PENDING)
				{

					::SleepEx(INFINITE, TRUE); // 아래도 가능
					//::WSAWaitForMultipleEvents(1, &wsaEvent, TRUE, WSA_INFINITE, TRUE);
				}
				else
				{
					// TODO : Error
					break;
				}
				
			}
			else
			{
				cout << "Data Recv Len = " << recvLen << endl;
			}


		}

		::closesocket(session.socket);
	}

	// 윈속 종료
	::WSACleanup();
}

결과

Callback 결과

결과는 그때그때마다 상황에 따라 달라질 수는 있지만, 위 결과에서는 처음엔 WSARecv가 바로 실행되어 "Data Recv Len = 100"이 출력이 되었지만 그 이후로는 Callback함수에서 "Data Recv Len Callback = 100"이 출력되는 것을 알 수 있었다.


APC와 비동기 입출력 등 많은 것을 공부할 수 있었다...신기신기

'Game Server (C++)' 카테고리의 다른 글

Atomic, Lock, Critical Section  (1) 2023.05.28
Asynchronous Socket IO - IOCP  (0) 2022.11.29
WSAEventSelect  (0) 2022.11.23
Socket IO - Select  (0) 2022.11.18
Non-blocking Socket  (0) 2022.11.18