From cbbdf7d027ab05d3bac2a7c2be9e94e1664e4370 Mon Sep 17 00:00:00 2001 From: Joshua Moerman Date: Sun, 22 Sep 2013 12:06:10 +0200 Subject: [PATCH] Added a simple threadsafe queue, and on top of that a simple thread pool --- include/lock_queue.hpp | 67 +++++++++++++++++++++++++++++++++++++++++ include/thread_pool.hpp | 39 ++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 include/lock_queue.hpp create mode 100644 include/thread_pool.hpp diff --git a/include/lock_queue.hpp b/include/lock_queue.hpp new file mode 100644 index 0000000..1cc0b40 --- /dev/null +++ b/include/lock_queue.hpp @@ -0,0 +1,67 @@ +#pragma once + +/* Thread safe queue (with fine grain locks). + Based on the book by Williams. + No wait_and_pop. +*/ + +#include +#include + +template +class queue { +private: + struct node { + std::shared_ptr data; + std::unique_ptr next; + }; + + std::mutex head_mutex; + std::unique_ptr head; + + std::mutex tail_mutex; + node* tail; + + node* get_tail(){ + std::lock_guard lock(tail_mutex); + return tail; + } + + std::unique_ptr pop_head(){ + std::lock_guard lock(head_mutex); + + if(head.get() == get_tail()){ + return nullptr; + } + + std::unique_ptr old_head = std::move(head); + head = std::move(old_head->next); + return old_head; + } + +public: + queue() + : head(new node) + , tail(head.get()) + {} + + queue(const queue &) = delete; + queue& operator=(const queue &) = delete; + + std::shared_ptr try_pop(){ + std::unique_ptr old_head = pop_head(); + return old_head ? old_head->data : nullptr; + } + + void push(T value){ + auto new_data = std::make_shared(std::move(value)); + std::unique_ptr p(new node); + + node* const new_tail = p.get(); + std::lock_guard lock(tail_mutex); + + tail->data = new_data; + tail->next = std::move(p); + tail = new_tail; + } +}; diff --git a/include/thread_pool.hpp b/include/thread_pool.hpp new file mode 100644 index 0000000..3364cae --- /dev/null +++ b/include/thread_pool.hpp @@ -0,0 +1,39 @@ +#pragma once + +/* Simple thread pool, no fancy stuff. + It is thread safe (i.e. concurrent adds). + Threads terminate if there is no work (we need wait_and_pop for that). + Works best if all work is added, and then run. + Doesn't care about exceptions. +*/ + +#include +#include +#include +#include "lock_queue.hpp" + +struct thread_pool { + void add(std::function f){ + work.push(f); + } + + void run(int number_of_threads){ + for(int i = 0; i < number_of_threads; ++i){ + threads.emplace_back([this]{ + auto x = work.try_pop(); + while(x){ + (*x)(); + x = work.try_pop(); + } + }); + } + } + + ~thread_pool(){ + for(auto & t : threads) t.join(); + } + +private: + queue> work; + std::vector threads; +};