スポンサーリンク

スレッドプールのコード

以前、スレッドプールのサンプルのリンクを張った。それを加工したものを作成。

#include <deque>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>

//! @brief スレッドに行わせる仕事のインターフェース
//! @details run()メソッドを実装し、処理内容を記述する \n
//! 使用例:
//! @code
//! class MyWork : public JobBase {
//! public:
//!     void run() override {}
//! };
//! @endcode
class JobBase
{
public:
    virtual void run() = 0;
    virtual ~JobBase() {}
};

//! @brief ジョブをためるキュー
class Queue
{
public:
    Queue() {}
    void push(std::unique_ptr<JobBase>&& data) {
        _deque.emplace_back(std::move(data));
    }
    std::unique_ptr<JobBase> pop() {
		if (_deque.empty()) {
			return nullptr;
		}
		auto data = std::move(_deque.front());
		_deque.pop_front();
		return data;
	}

    bool empty() const {
        return _deque.empty();
    }
private:
    // ジョブがたまっているキュー
    // ThreadPoolで作成したN個のThreadWorkerスレッドが、このキューからジョブを取り出して実行する
    std::deque<std::unique_ptr<JobBase>> _deque;
};

//! スレッドプールで動いているスレッドの実装
class ThreadWorker
{
public:
    ThreadWorker(bool& isTerminationRequested, Queue& queue,
        std::mutex& mutex, std::condition_variable& cv,
        int& activeJobCount, std::condition_variable& activeJobCv)
        : _isTerminationRequested(isTerminationRequested),
        _queue(queue),
        _mutex(mutex),
        _cv(cv), 
        _activeJobCount(activeJobCount), 
        _activeJobCv(activeJobCv) 
    {}

    void operator()() {
        while (1) {
            std::unique_ptr<JobBase> jobinstance;
            {
                std::unique_lock<std::mutex> ul(_mutex);
                while (_queue.empty()) {
                    if (_isTerminationRequested) { return; }
                    _cv.wait(ul);
                }

                jobinstance = _queue.pop();
                assert(jobinstance != nullptr);
                _activeJobCount++;
            }

            jobinstance->run(); // ジョブを実行

            {
                std::unique_lock<std::mutex> ul(_mutex);
                _activeJobCount--; // 実行中ジョブ数を減らす
                if (_activeJobCount == 0 && _queue.empty()) {
                    _activeJobCv.notify_all(); // すべてのジョブが完了したことを通知
                }
            }
        }
    }

private:
    bool& _isTerminationRequested;
    Queue& _queue;
    std::mutex& _mutex;
    std::condition_variable& _cv;
    int& _activeJobCount; // 実行中ジョブ数
    std::condition_variable& _activeJobCv; // waitForCompletion() へ通知
};

//! @brief スレッドプールクラス
//! @details N個のスレッドで、M個のジョブを実行する。\n
//! waitForCompletion()で、現在キューを入れたすべてのジョブが完了するまで待機できる \n
//! 使用例:
//! @code
//! ThreadPool pool(4);
//! pool.add(std::make_unique<MyWork>());
//! pool.add(std::make_unique<MyWork>());
//! pool.waitForCompletion();
//! pool.add(std::make_unique<MyWork>());
//! pool.add(std::make_unique<MyWork>());
//! @endcode
class ThreadPool
{
public:
    ThreadPool(int threadCount)
        : _isTerminationRequested(false), _activeJobCount(0)
    {
        for (int n = 0; n < threadCount; n++) {
            auto worker = std::make_shared<ThreadWorker>(_isTerminationRequested, _queue, _mutex, _cv, _activeJobCount, _activeJobCv);
            _workers.push_back(worker);
            _threads.emplace_back(std::thread(std::ref(*worker)));
        }
    }


    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> ul(_mutex);
            _isTerminationRequested = true;
        }
        _cv.notify_all();
        for (auto& thread : _threads) {
            thread.join();
        }
    }

    //! @brief すべてのジョブが完了するまで待機する
    void waitForCompletion() {
        std::unique_lock<std::mutex> ul(_mutex);
        _activeJobCv.wait(ul, [this]() { return _activeJobCount == 0 && _queue.empty(); });
    }

    //! @brief ジョブをキューに追加
    //! @param jobinstance ジョブ
    void add(std::unique_ptr<JobBase>&& jobinstance) {
        {
            std::unique_lock<std::mutex> ul(_mutex);
            _queue.push(std::move(jobinstance));
        }
        _cv.notify_all();
    }

private:
    bool _isTerminationRequested;
    Queue _queue;
    std::mutex _mutex;
    std::condition_variable _cv;
    std::vector<std::thread> _threads;
    std::vector<std::shared_ptr<ThreadWorker>> _workers;

    int _activeJobCount; // 実行中ジョブ数
    std::condition_variable _activeJobCv; // ジョブの完了を待機するための条件変数
};

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)


この記事のトラックバックURL: