スポンサーリンク

スレッドプールを使用してデータ読み込みを行う

画像ビューア的なものを作っているのだが、2000件くらいの画像を一括読み込みすると読み込みまでフリーズするような状態になったので、スレッドプールを使って操作中に読み込みをするようにした。

// https://docs.wxwidgets.org/3.0/overview_helloworld.html

// プリプロセッサに以下二つを追加
// __WXMSW__
// WXUSINGDLL

// サブシステムをWindowsに設定(WinMainで呼び出すので)
// Windows (/SUBSYSTEM:WINDOWS)

#ifndef WX_PRECOMP
#include <wx/wx.h>
#endif

#include <wx/gdicmn.h> // wxPointに必要
#include <wx/frame.h>  // wxFrameに必要




#include <string>
#include <wx/statbmp.h>
#include <vector>
#include <memory>
#include <filesystem>

#include "ThreadWorker.hpp"
 
//! @brief データをスレッドで読み込むクラス
//! @details データの読み込みをスレッドプールで行う
//! @tparam Source データの入手元 std::filesystem::pathなど
//! @tparam Reference データの格納先 std::shared_ptr<DATA> など
//! @tparam Job データの読み込みを行うスレッドプールのジョブ型
template<typename Source, typename Reference, typename Job>
class MultiDataLoader {

    std::unique_ptr<ThreadPool> _pool; // スレッドプール

public:
    MultiDataLoader(const int _threadcount_) : _pool( new ThreadPool(_threadcount_) ) {}

    bool isFinished() {
        return _pool->isAllJobCompleted();
    }
    ~MultiDataLoader() {
        _pool.reset();
    }

    //! @brief データの読み込みを開始する
    //! @param _source_ データの入手元一覧
    //! @param ploaded データの格納先
    //! @param unloadedsign 未読み込みの場合の値 nullptrなど
    void load(const std::vector<Source>& _source_, std::vector<Reference>* _ploaded_, Reference _unloadedsign_=nullptr) {

        // データの格納先を用意
        _ploaded_->clear();
        _ploaded_->resize(_source_.size(), _unloadedsign_);

        for (size_t index = 0; index < _source_.size(); index++) {
            _pool->add(std::make_unique<Job>(_ploaded_, _source_[index], index));
        }

    }

};

//! @brief データの読み込みを行うジョブ
class JobLoadData : public JobBase {
    using Reference = std::shared_ptr<wxBitmap>;
    std::filesystem::path path;
    size_t index;
    std::vector<Reference>* ref;
public:

    //! @brief コンストラクタ
    //! @param _pref_ データの格納先
    //! @param _path_ データの入手元
    //! @param _index_ データの格納先のインデックス
    JobLoadData(std::vector<Reference>* _pref_, const std::filesystem::path& _path_, size_t _index_) {
        ref = _pref_;
        path = _path_;
        index = _index_;
    }
    virtual void run() override {

        wxString fname = wxString::FromUTF8((const char*)path.u8string().c_str());

        wxLogNull logNo;  // 警告を抑制
        // 拡張子チェック
        (*ref)[index] = std::make_shared<wxBitmap>(fname, wxBITMAP_TYPE_ANY);      

    }

};

//! @brief 指定したディレクトリ以下の、指定した拡張子のファイル一覧を取得する
//! @param _dir_path_ 検索するディレクトリのパス //! @param extensions 検索対象の拡張子 //! @return 検索結果のファイルパス一覧 std::vector<std::filesystem::path> get_files_with_extensions(const std::filesystem::path& _dir_path_, const std::vector<std::string>& _extensions_) { std::vector<std::filesystem::path> file_list; if (!std::filesystem::exists(_dir_path_) || !std::filesystem::is_directory(_dir_path_)) { std::cerr << "Error: The provided path is not a valid directory.\n"; return file_list; } for (const auto& entry : std::filesystem::recursive_directory_iterator(_dir_path_)) { if (entry.is_regular_file()) { std::string ext = entry.path().extension().string(); if (std::find(_extensions_.begin(), _extensions_.end(), ext) != _extensions_.end()) { file_list.push_back(entry.path()); } } } return file_list; }
 
class MyFrame : public wxFrame {
    using DataLoader = MultiDataLoader<std::filesystem::path, std::shared_ptr<wxBitmap>, JobLoadData>;
    std::unique_ptr< DataLoader > ploader;
public:
    MyFrame(const wxString& title, const wxPoint& pos, const wxSize& size)
        : wxFrame(NULL, wxID_ANY, title, pos, size), currentIndex(0) {

        // 画像ファイルの読み込み
        // wxBITMAP_TYPE_JPEG 等
        wxInitAllImageHandlers();

        // ファイル一覧の取得
        auto sourcepath = LR"(C:\test\images)";
        std::vector<std::filesystem::path> paths
            = get_files_with_extensions(sourcepath, { ".png",".jpg"});

        // データ読み込みの準備(8スレッド)
        ploader = std::make_unique< DataLoader >(8);

        // データのロード開始
        ploader->load(paths, &bitmaps);

        // 初期状態 空の画像を表示
        imageCtrl = new wxStaticBitmap(this, wxID_ANY, wxNullBitmap, wxPoint(10, 10), wxSize(400, 300));

        // キーボードイベントを処理
        Bind(wxEVT_KEY_DOWN, &MyFrame::OnKeyDown, this);
    }

    ~MyFrame() {
        ploader.reset();
    }

private:
    std::vector<std::shared_ptr<wxBitmap>> bitmaps;
    wxStaticBitmap* imageCtrl;
    size_t currentIndex;

    void OnKeyDown(wxKeyEvent& event) {
        if (event.GetKeyCode() == WXK_LEFT) {
            if (currentIndex > 0) {
                currentIndex--;
                UpdateImage();
            }
        }
        else if (event.GetKeyCode() == WXK_RIGHT) {
            if (currentIndex < bitmaps.size() - 1) {
                currentIndex++;
                UpdateImage();
            }
        }
    }

    void UpdateImage() {

        if(bitmaps[currentIndex] == nullptr){
            //imageCtrlに表示されている画像を削除
            imageCtrl->SetBitmap(wxNullBitmap);
        }
        else {
            imageCtrl->SetBitmap(*bitmaps[currentIndex]);
        }
        Layout();  // レイアウトを更新
    }
};
class MyApp : public wxApp {
public:
    virtual bool OnInit() {

        // windowsのコンソールでUTF-8を表示するために必要
        // ロケール設定
        std::locale::global(std::locale("ja_JP.UTF-8"));
        std::cerr.imbue(std::locale("ja_JP.UTF-8"));
        std::cout.imbue(std::locale("ja_JP.UTF-8"));
        SetConsoleOutputCP(CP_UTF8);

        MyFrame* frame = new MyFrame("Bitmap Viewer", wxPoint(50, 50), wxSize(450, 340));
        frame->Show(true);
        return true;
    }
};

wxIMPLEMENT_APP(MyApp);

画像フォルダ内のデータが読み込まれるので、左右キーで表示切替ができる。

ThreadWorker.hpp

スレッドプールのコードで、スレッドプールそのものを破棄するときに、未処理のキューをすべて削除して即終了できるように修正。

#pragma once

#include <deque>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>



//! @brief スレッドに行わせる仕事のインターフェース
//! @details run()メソッドを実装し、処理内容を記述する \n
//! 使用例:
//! @code
//! class MyWork : public JobBase {
//! public:
//!     void run() override {}
//! };
//! @endcode
class JobBase
{
public:
    virtual void run() = 0;
    virtual ~JobBase() {}
};



//! @brief ジョブをためるキュー
class Queue
{
public:
    Queue() {}
    void push(std::unique_ptr<JobBase>&& data) {
        _deque.emplace_back(std::move(data));
    }
    std::unique_ptr<JobBase> pop() {
        if (_deque.empty()) {
            return nullptr;
        }
        auto data = std::move(_deque.front());
        _deque.pop_front();
        return data;
    }

    bool empty() const {
        return _deque.empty();
    }
    void clear() {
        _deque.clear();  // 未処理のジョブを全削除
    }

private:
    // ジョブがたまっているキュー
    // ThreadPoolで作成したN個のThreadWorkerスレッドが、このキューからジョブを取り出して実行する
    std::deque<std::unique_ptr<JobBase>> _deque;
};



//! スレッドプールで動いているスレッドの実装
class ThreadWorker
{
public:
    ThreadWorker(bool& isTerminationRequested, Queue& queue,
        std::mutex& mutex, std::condition_variable& cv,
        int& activeJobCount, std::condition_variable& activeJobCv)
        : _isTerminationRequested(isTerminationRequested),
        _queue(queue),
        _mutex(mutex),
        _cv(cv),
        _activeJobCount(activeJobCount),
        _activeJobCv(activeJobCv)
    {}

    void operator()() {
        while (1) {
            std::unique_ptr<JobBase> jobinstance;
            {
                std::unique_lock<std::mutex> ul(_mutex);
                while (_queue.empty()) {
                    if (_isTerminationRequested) { return; }
                    _cv.wait(ul);
                }

                jobinstance = _queue.pop();
                assert(jobinstance != nullptr);
                _activeJobCount++;
            }

            jobinstance->run(); // ジョブを実行

            {
                std::unique_lock<std::mutex> ul(_mutex);
                _activeJobCount--; // 実行中ジョブ数を減らす
                if (_activeJobCount == 0 && _queue.empty()) {
                    _activeJobCv.notify_all(); // すべてのジョブが完了したことを通知
                }
            }
        }
    }

private:
    bool& _isTerminationRequested;
    Queue& _queue;
    std::mutex& _mutex;
    std::condition_variable& _cv;
    int& _activeJobCount; // 実行中ジョブ数
    std::condition_variable& _activeJobCv; // waitForCompletion() へ通知
};



//! @brief スレッドプールクラス
//! @details N個のスレッドで、M個のジョブを実行する。\n
//! waitForCompletion()で、現在キューを入れたすべてのジョブが完了するまで待機できる \n
//! 使用例:
//! @code
//! ThreadPool pool(4);
//! pool.add(std::make_unique<MyWork>());
//! pool.add(std::make_unique<MyWork>());
//! pool.waitForCompletion();
//! pool.add(std::make_unique<MyWork>());
//! pool.add(std::make_unique<MyWork>());
//! @endcode
class ThreadPool
{
public:
    ThreadPool(int threadCount)
        : _isTerminationRequested(false), _activeJobCount(0)
    {
        for (int n = 0; n < threadCount; n++) {
            auto worker = std::make_shared<ThreadWorker>(_isTerminationRequested, _queue, _mutex, _cv, _activeJobCount, _activeJobCv);
            _workers.push_back(worker);
            _threads.emplace_back(std::thread(std::ref(*worker)));
        }
    }


    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> ul(_mutex);
            _isTerminationRequested = true;
            _queue.clear(); // 未処理のジョブを全削除し中断
        }
        _cv.notify_all();
        for (auto& thread : _threads) {
            if (thread.joinable()) {
                thread.join();  // スレッドを安全に終了
            }
        }
    }

    //! @brief すべてのジョブが完了するまで待機する
    void waitForCompletion() {
        std::unique_lock<std::mutex> ul(_mutex);
        _activeJobCv.wait(ul, [this]() { return _activeJobCount == 0 && _queue.empty(); });
    }
    //! @brief すべてのジョブが完了したかを確認する関数
    bool isAllJobCompleted() {
        std::unique_lock<std::mutex> ul(_mutex);
        return _activeJobCount == 0 && _queue.empty();
    }

    //! @brief ジョブをキューに追加
    //! @param jobinstance ジョブ
    void add(std::unique_ptr<JobBase>&& jobinstance) {
        {
            std::unique_lock<std::mutex> ul(_mutex);
            _queue.push(std::move(jobinstance));
        }
        _cv.notify_all();
    }

private:
    bool _isTerminationRequested;
    Queue _queue;
    std::mutex _mutex;
    std::condition_variable _cv;
    std::vector<std::thread> _threads;
    std::vector<std::shared_ptr<ThreadWorker>> _workers;

    int _activeJobCount; // 実行中ジョブ数
    std::condition_variable _activeJobCv; // ジョブの完了を待機するための条件変数
};

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

日本語が含まれない投稿は無視されますのでご注意ください。(スパム対策)


この記事のトラックバックURL: