Threadpool
Loading...
Searching...
No Matches
threadpool.hpp
Go to the documentation of this file.
1/*
2** Copyright 2024 Maxtek Consulting
3**
4** Permission is hereby granted, free of charge, to any person obtaining a copy
5** of this software and associated documentation files (the "Software"), to deal
6** in the Software without restriction, including without limitation the rights
7** to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8** copies of the Software, and to permit persons to whom the Software is
9** furnished to do so, subject to the following conditions:
10**
11** The above copyright notice and this permission notice shall be included in all
12** copies or substantial portions of the Software.
13**
14** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15** IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16** FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17** AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18** LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19** OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20** SOFTWARE.
21*/
22
23#ifndef THREADPOOL_HPP
24#define THREADPOOL_HPP
25
32#include <algorithm>
33#include <functional>
34#include <future>
35#include <memory>
36#include <queue>
37#include <thread>
38#include <typeinfo>
39#include <vector>
40
41namespace maxtek
42{
49 {
50 public:
51
58 threadpool(size_t threads = std::thread::hardware_concurrency())
59 {
60 if (threads == 0)
61 {
62 throw std::runtime_error("failed to construct threadpool with zero threads");
63 }
64
65 _active = true;
66
67 _workers.reserve(threads);
68
69 while (_workers.size() < _workers.capacity())
70 {
71 _workers.push_back(std::thread([&]()
72 {
73 std::function<void()> task;
74 while (pop_task(task))
75 {
76 task();
77 } }));
78 }
79 }
84 {
85 if (_active)
86 {
87 shutdown();
88 }
89 }
90
100 template <class F, class... Args>
101 std::future<std::result_of_t<F(Args...)>> submit(F &&function, Args &&...args)
102 {
103 std::shared_ptr<std::packaged_task<std::result_of_t<F(Args...)>()>> packaged_task;
104 std::function<void()> work;
105 std::future<std::result_of_t<F(Args...)>> result;
106
107 packaged_task = std::make_shared<std::packaged_task<std::result_of_t<F(Args...)>()>>(std::bind(std::forward<F>(function), std::forward<Args>(args)...));
108 result = packaged_task->get_future();
109
110 work = [packaged_task]()
111 {
112 (*packaged_task)();
113 };
114
115 push_task(std::move(work));
116
117 return result;
118 }
119
124 bool active() const
125 {
126 return _active;
127 }
128
133 void shutdown()
134 {
135 if (!_active)
136 {
137 throw std::runtime_error("failed to shut down inactive threadpool");
138 }
139 _active = false;
140 _condition.notify_all();
141 for (std::thread &worker : _workers)
142 {
143 worker.join();
144 }
145 }
146
147 private:
148 void push_task(std::function<void()> &&task)
149 {
150 std::unique_lock<std::mutex> lock(_mutex);
151 if (!_active)
152 {
153 throw std::runtime_error("failed to submit to inactive threadpool");
154 }
155 _tasks.push(std::move(task));
156 lock.unlock();
157 _condition.notify_one();
158 }
159
160 bool pop_task(std::function<void()> &task)
161 {
162 std::unique_lock<std::mutex> lock(_mutex);
163 bool result(false);
164 _condition.wait(
165 lock,
166 [&]()
167 {
168 return (!_active || !_tasks.empty());
169 });
170 if (_active)
171 {
172 task = _tasks.front();
173 _tasks.pop();
174 result = true;
175 }
176 return result;
177 }
178
179
180 size_t num_threads;
181 bool _active;
182 std::vector<std::thread> _workers;
183 std::queue<std::function<void()>> _tasks;
184 std::mutex _mutex;
185 std::condition_variable _condition;
186 };
187}
188#endif
Definition threadpool.hpp:49
~threadpool()
destroys threadpool after calling shutdown if necessary
Definition threadpool.hpp:83
threadpool(size_t threads=std::thread::hardware_concurrency())
constructs a new threadpool
Definition threadpool.hpp:58
bool active() const
check if the threadpool is active
Definition threadpool.hpp:124
void shutdown()
shut down threadpool by joining threads and rejecting submissions
Definition threadpool.hpp:133
std::future< std::result_of_t< F(Args...)> > submit(F &&function, Args &&...args)
submits a function with its arguments to the threadpool
Definition threadpool.hpp:101
Definition threadpool.hpp:42