Browse Source

Added a simple threadsafe queue, and on top of that a simple thread pool

master
Joshua Moerman 11 years ago
parent
commit
cbbdf7d027
  1. 67
      include/lock_queue.hpp
  2. 39
      include/thread_pool.hpp

67
include/lock_queue.hpp

@ -0,0 +1,67 @@
#pragma once
/* Thread safe queue (with fine grain locks).
Based on the book by Williams.
No wait_and_pop.
*/
#include <memory>
#include <thread>
template <typename T>
class queue {
private:
struct node {
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
node* get_tail(){
std::lock_guard<std::mutex> lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head(){
std::lock_guard<std::mutex> lock(head_mutex);
if(head.get() == get_tail()){
return nullptr;
}
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
public:
queue()
: head(new node)
, tail(head.get())
{}
queue(const queue &) = delete;
queue& operator=(const queue &) = delete;
std::shared_ptr<T> try_pop(){
std::unique_ptr<node> old_head = pop_head();
return old_head ? old_head->data : nullptr;
}
void push(T value){
auto new_data = std::make_shared<T>(std::move(value));
std::unique_ptr<node> p(new node);
node* const new_tail = p.get();
std::lock_guard<std::mutex> lock(tail_mutex);
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}
};

39
include/thread_pool.hpp

@ -0,0 +1,39 @@
#pragma once
/* Simple thread pool, no fancy stuff.
It is thread safe (i.e. concurrent adds).
Threads terminate if there is no work (we need wait_and_pop for that).
Works best if all work is added, and then run.
Doesn't care about exceptions.
*/
#include <vector>
#include <functional>
#include <thread>
#include "lock_queue.hpp"
struct thread_pool {
void add(std::function<void()> f){
work.push(f);
}
void run(int number_of_threads){
for(int i = 0; i < number_of_threads; ++i){
threads.emplace_back([this]{
auto x = work.try_pop();
while(x){
(*x)();
x = work.try_pop();
}
});
}
}
~thread_pool(){
for(auto & t : threads) t.join();
}
private:
queue<std::function<void()>> work;
std::vector<std::thread> threads;
};