Coding Memo

[C++] Concurrent 우선순위 큐 (with lock) 본문

Language/C++

[C++] Concurrent 우선순위 큐 (with lock)

minttea25 2024. 5. 13. 18:24

 

윈도우에서 지원하는 concurrent에 대한 자료구조 헤더가 있는데, 이들은 lock-free로 구현되어 있으니 이 쪽을 사용하는 것이 훨씬 더 도움이 될 수도 있겠다 ㅋㅋ

 

std::mutex를 이용한 lock과 std::priority_queue를 사용하여 간단한 concurrent 우선순위 큐를 만들어보았다.

기본적인 push 및 pop 외에, 지정된 조건에 따라 결과값을 반환하는 TryPop등의 함수도 포함했다.

상황에 따라 필요할 수도 있기 때문이다. 
(예를 들어, top의 객체를 확인하고 이 객체가 어떤 조건이 맞을 경우에만 pop을 해야할 필요가 있을 때, top과 pop을 따로하면 atomic하게 실행시켜야되는 데, 이 과정에서 외부 lock을 걸어야 되기 때문이다.)

 

추가적으로, 이동연산자를 제외하고 lock을 얻어야하는 복사, 대입 연산자를 삭제했다.

(이동연산자를 사용하는 시나리오가 있나 곰곰히 생각해보았지만, 아직까지 떠오르는 경우가 없는 것 같아서 해당 연산자도 삭제해도 될 것 같다.)

 

이 코드는 STL의 기본 std::priority_queue에 단순히 lock을 이용하고, 몇몇 추가적인 함수를 더한 간단한 클래스이다.

#pragma once

#include <functional>
#include <mutex>
#include <queue>
#include <vector>

#define USE_LOCK(name) std::mutex name
#define LOCK(name) std::lock_guard<std::mutex> lock(name)

template<typename T, typename Less = less<typename std::vector<T>::value_type>>
class LockPriorityQueue
{
public:
	LockPriorityQueue() {}
	~LockPriorityQueue() {}

	LockPriorityQueue(const LockPriorityQueue&) = delete;
	LockPriorityQueue(LockPriorityQueue&& other) noexcept
	{
		std::lock_guard<std::mutex> lock(other.mtx);

		_pq = std::move(other._pq);
		other._pq = std::priority_queue<T, std::vector<T>, Less>();
	}
	LockPriorityQueue& operator=(const LockPriorityQueue&) = delete;
	LockPriorityQueue& operator=(LockPriorityQueue&&) noexcept = delete;

	void Push(T item)
	{
		LOCK(mtx);
		_pq.push(item);
	}

	T Pop()
	{
		LOCK(mtx);

		if (_pq.empty())
		{
			if constexpr (std::is_pointer_v<T>) return nullptr;
			else return T();
		}

		T top = _pq.top();
		_pq.pop();
		return top;
	}

	bool TryPop(T& item)
	{
		LOCK(mtx);

		if (_pq.empty()) return false;
		else
		{
			item = _pq.front();
			_pq.pop();
			return true;
		}
	}

	bool TryPop(T& item, std::function<bool(const T&)> condition)
	{
		LOCK(mtx);

		if (_pq.empty()) return false;

		const T& top = _pq.top();

		if (condition(top))
		{
			item = top;
			_pq.pop();
			return true;
		}

		return false;
	}

	T Peek()
	{
		LOCK(mtx);

		if (_pq.empty())
		{
			if constexpr (std::is_pointer_v<T>) return nullptr;
			else return T();
		}
		else return _pq.front();
	}

	bool TryPeek(T& top)
	{
		LOCK(mtx);

		if (_pq.empty()) return false;
		else
		{
			top = _pq.top();
			return true;
		}
	}

	void Clear()
	{
		LOCK(mtx);
		while (_pq.empty() == false)
		{
			_pq.pop();
		}
	}
private:
	USE_LOCK(mtx);
	std::priority_queue<T, std::vector<T>, Less> _pq;
};

 


 

왜 만들어 보았나요?

 

단순히 기본 pq에는 없는 함수를 사용할 필요가 있어서가 첫번째 이유이고, 두 번째는 T에 기본 데이터 타입이나, 객체 외에도, 포인터, 공유포인터 등을 사용했을 때, 어떻게 작동하는지 확인하고 싶어서이다.

 

따라서 아래 테스트 코드까지 작성해두겠다.

아래 4가지 타입을 넣어서 실행해보았고, 이동 생성자에 대한 결과도 확인했다.

 

1. 기본 데이터 타입 (int)
2. 객체 (동적 할당이 아닌 단순 스택에 존재하는 객체)
3. 포인터
4. std::shared_ptr (공유 포인터)

 

더보기
#pragma once

#include "LockPriorityQueue.h"

using namespace std;

class A : public enable_shared_from_this<A>
{
public:
    A() : _a(-1) {}
    A(const int a) : _a(a) {}
    ~A() {}

    bool operator<(const A& other)
    {
        return other._a < _a;
    }
    
    bool operator==(const A& other)
    {
        return other._a == _a;
    }

    bool operator!=(const A& other)
    {
        return other._a != _a;
    }

    static friend std::ostream& operator<<(std::ostream& os, const A& a)
    {
        os << a._a;
        return os;
    }

    static friend std::ostream& operator<<(std::ostream& os, const std::shared_ptr<A>& a) {
        os << a->_a;
        return os;
    }

private:
    int _a;

    friend struct AComp;
    friend struct APointerComp;
    friend struct ASharedComp;
};

struct AComp
{
    bool operator()(const A& a, const A& b)
    {
        return a._a > b._a;
    }
};

struct APointerComp
{
    bool operator()(const A* a, const A* b)
    {
        return a->_a > b->_a;
    }
};

struct ASharedComp
{
    bool operator()(const std::shared_ptr<A>& a, const std::shared_ptr<A>& b)
    {
        return a->_a > b->_a;
    }
};

int main()
{
    {
        cout << "Primitive Data Type (int)" << endl;

        LockPriorityQueue<int> q;
        q.Push(50);
        q.Push(10);
        q.Push(30);
        q.Push(20);
        q.Push(10);

        for (int i = 0; i < 10; ++i)
        {
            auto pop = q.Pop();
            if (pop != int()) cout << "Poped: " << pop << endl;
            else cout << "Queue is empty." << endl;
        }

        cout << endl;
    }
    
    {
        cout << "A (stack memory)" << endl;

        LockPriorityQueue<A, AComp> q;
        q.Push(A(50));
        q.Push(A(10));
        q.Push(A(30));
        q.Push(A(20));
        q.Push(A(10));

        for (int i = 0; i < 10; ++i)
        {
            auto pop = q.Pop();
            if (pop != A()) cout << "Poped: " << pop << endl;
            else cout << "Queue is empty." << endl;
        }
        cout << endl;
    }
    
    {
        cout << "A* (pointer type)" << endl;

        LockPriorityQueue<A*, APointerComp> q;
        q.Push(new A(30));
        q.Push(new A(50));
        q.Push(new A(10));
        q.Push(new A(10));
        q.Push(new A(20));

        for (int i = 0; i < 10; ++i)
        {
            auto pop = q.Pop();
            if (pop != nullptr)
            {
                cout << "Poped: " << *pop << endl;
                delete pop;
            }
            else cout << "Queue is empty." << endl;
        }
        cout << endl;
    }
    
    {
        cout << "std::shared_ptr<A>" << endl;

        LockPriorityQueue<std::shared_ptr<A>, ASharedComp> q;
        q.Push(std::make_shared<A>(30));
        q.Push(std::make_shared<A>(50));
        q.Push(std::make_shared<A>(10));
        q.Push(std::make_shared<A>(10));
        q.Push(std::make_shared<A>(40));
        q.Push(std::make_shared<A>(30));
        q.Push(std::make_shared<A>(20));
        q.Push(std::make_shared<A>(50));
        q.Push(std::make_shared<A>(10));
        q.Push(std::make_shared<A>(30));

        for (int i = 0; i < 5; ++i)
        {
            auto pop = q.Pop();
            if (pop != nullptr) cout << "Poped: " << *pop.get() << endl;
            else cout << "Queue is empty." << endl;
        }

        cout << endl;

        cout << "moved from queue" << endl;

        LockPriorityQueue<std::shared_ptr<A>, ASharedComp> q2(std::move(q));
        for (int i = 0; i < 10; ++i)
        {
            auto pop = q2.Pop();
            if (pop != nullptr) cout << "Poped: " << *pop.get() << endl;
            else cout << "Queue is empty." << endl;
        }
    }

    return 0;
}

 

주목해야 할 점은 다음과 같다.

 

1. LockPriorityQueue가 비교로 해당 구조체의 ()연산자를 제대로 호출하고 있는가?

Push할 때, 비교연산이 제대로 실행되는지 확인해보자. 만약 비교 연산자를 직접 지정하지 않는다면, 메모리 주소 값을 비교하려고 할 것이다.

 

2. 큐가 비어있을 때, 어떻게 동작하는가?

위 코드에서는 기본 생성자 T() 또는 nullptr을 반환하고 있다. std::is_pointer_v<T>를 통해 컴파일 단계에서 해당 템플릿 클래스의 타입이 pointer인지 확인하고, pointer라면 nullptr을 반환하고 그렇지 않으면 기본 생성자의 결과값(T())를 반환한다.

참고로, shared_ptr도 일단은 클래스이기 때문에 T()를 반환한다. 이는 empty를 나타내고 이 값은 nullptr과 비교할 수 있다!

 

 

lock을 이용하기 때문에 조건 경합이 심하면 아무래도 느려질 수 밖에 없다. 윈도우의 concurrent 큐나 스택 헤더를 보고 lock-free가 어떻게 구현되어 있는지 확인해 볼 필요가 있을 것 같다.