[C++14]线程池
2023-7-14 ~ 2023-7-15
(0)
线程池
概述
- 线程池是一种数据处理形式,将一定数量的线程组合起来,用统一的方式调度,达到线程复用的效果,运行时将任务加入任务队列,线程自动调度执行
为什么使用线程池
- 重复利用线程,减小创建和销毁线程带来的开销
- 提高相应速度,任务无需等待线程创建(在线程池内线程不足时等待时间将延长,可用动态线程池解决)
- 提高线程的可管理性
本文概述
本文实现的是一个最低可在 C++14 使用的线程池,其中,线程池的大小在运行时是固定的,不能动态改变,可能出现几个长时间任务阻塞线程池的情况
流程图
程序
头文件 ThreadPool.h
////////////////////////////////////////////
// 程序:线程池
// 作者:侧云<2047615717@qq.com>
// 编译环境:Visual Studio 2022,最低 C++14
// 编写日期:2023-7-15
#pragma once
#include <future>
#include <vector>
#include <set>
struct Thread_Task
{
using TimePointT = typename ::std::chrono::steady_clock::time_point; // 高精度时钟的时间点类型
using TimeT = typename ::std::chrono::steady_clock::duration; // 高精度时钟的时间段类型
using PriorityT = int; // 优先级类型
using FuncionT = ::std::function<void()>; // 任务函数类型
TimePointT expireTime; // 超过该时间点为超时
PriorityT priority = 0; // 优先级,在本文实现中,越大代表越先
bool bExecuteOnTimeout = false; // 为 true 时,超时后仍然执行
FuncionT func; // 任务函数
};
constexpr bool operator<(const Thread_Task& task1, const Thread_Task& task2) noexcept { return task1.priority < task2.priority; }
constexpr bool operator>(const Thread_Task& task1, const Thread_Task& task2) noexcept { return task1.priority > task2.priority; }
// 用于比较优先级大小
class ThreadPool
{
public:
using MutexT = ::std::mutex; // 互斥锁
using ConditionVariableT = ::std::condition_variable; // 环境变量
using ThreadT = ::std::thread; // 线程
using ThreadPtrT = ThreadT*; // 线程指针
using ThreadVectorT = ::std::vector<ThreadPtrT>; // 线程指针向量
using TaskT = Thread_Task; // 任务
using TaskSetT = ::std::multiset<TaskT, ::std::greater<TaskT>>; // 任务队列,greater 代表越大越靠前
using ThreadNumT = ::uint8_t; // 线程数量的类型,因为线程一般不超过 256 个
using AtomicThreadNumT = ::std::atomic<ThreadNumT>; // 原子的线程数量的类型
using TaskNumT = ::uint32_t; // 任务数量的类型,因为任务一般不超过 4294967296 个
using UniqueLockT = ::std::unique_lock<MutexT>; // 可解锁的锁,用于保护数据和环境变量的等待
using LockGuardT = ::std::lock_guard<MutexT>; // 不可解锁的锁,用于保护数据
template<class _Ret> using PackagedTaskT = ::std::packaged_task<_Ret()>; // 异步提供程序,使运行与结果分离
using TimePointT = typename TaskT::TimePointT; // 时间点类型
using TimeT = typename TaskT::TimeT; // 时间段类型
using PriorityT = typename TaskT::PriorityT; // 优先度类型
using FuncionT = typename TaskT::FuncionT; // 任务函数类型
public:
ThreadPool() noexcept {}
ThreadPool(const ThreadPool&) = delete; // 不可复制构造
ThreadPool& operator=(const ThreadPool&) = delete;
~ThreadPool() noexcept { exit(); }
protected: // 以下为不检查,提供给内部的程序
void init_unchecked(ThreadNumT num) noexcept // 设置线程数
{
m_threadNum = num;
}
void start_unchecked() noexcept // 线程池开始运行
{
m_bRunning = true; // 设置正在运行
m_bTerminate = false; // 设置没有退出
for (ThreadNumT i = 0; i < m_threadNum; ++i) // 循环将线程加入向量,线程调用任务调度函数
m_threads.push_back(new ThreadT(&ThreadPool::dispatch_task, this));
}
void resume_unchecked() noexcept // 继续运行线程池
{
m_bRunning = true; // 设置正在运行
m_pauseCondition.notify_all(); // 唤醒暂停的线程
}
void pause_no_wait_unchecked() noexcept // 暂停线程池,不等待线程执行完当前任务
{
m_bRunning = false; // 设置不在运行
m_taskCondition.notify_all(); // 唤醒等待任务的线程,此时线程会进入 m_pauseCondition 的等待
}
void pause_unchecked() noexcept // 暂停线程池,等待线程执行完当前任务
{
pause_no_wait_unchecked();
UniqueLockT lock(m_mutex);
if (m_runningNum) // 若有运行中的线程,则等待
m_waitCondition.wait(lock);
}
void clear_unchecked() noexcept // 清除线程和任务数据,用于 exit
{
m_threads.clear();
m_tasks.clear();
m_threadNum = 0;
}
TaskNumT get_task_num_unchecked() const noexcept // 获得剩余任务数量
{
return (TaskNumT)m_tasks.size();
}
bool is_running_unchecked() const noexcept // 判断线程池是否处于运行状态
{
return m_bRunning && !m_bTerminate;
}
bool is_tasking_unchecked() const noexcept // 判断线程池是否有任务没处理
{
return m_tasks.size();
}
bool is_all_done_unchecked() const noexcept // 判断线程池是否已经完成所以任务
{
return m_tasks.empty();
}
public: // 以下为检查,提供给外部的程序
bool init(ThreadNumT num) noexcept
{
LockGuardT lock(m_mutex); // 保护数据,防止多个线程同时运行
if (!m_threads.empty()) // 只有线程池为空时可执行
return false;
init_unchecked(num);
return true;
}
bool start() noexcept
{
LockGuardT lock(m_mutex);
if (!m_threads.empty()) // 只有线程池为空时可执行
return false;
start_unchecked();
return true;
}
void exit_no_wait() noexcept
{
m_bTerminate = true; // 设置已退出
pause_no_wait_unchecked(); // 原为暂停且不等待任务完成,但此时 m_bTerminate 会使线程结束而不是进入暂停状态
clear_unchecked(); // 清除数据
}
void exit() noexcept
{
m_bTerminate = true;
pause_unchecked(); // 原为暂停且等待任务完成,但此时 m_bTerminate 会使线程结束而不是进入暂停状态
m_threadNum = 0;
clear_unchecked();
}
bool resume() noexcept
{
if (m_bRunning || m_bTerminate) // 若正在运行或已经退出,则失败
return false;
resume_unchecked();
return true;
}
bool pause_no_wait() noexcept
{
if (!m_bRunning || m_bTerminate) // 若不在运行或已经退出,则失败
return false;
pause_no_wait_unchecked();
return true;
}
bool pause() noexcept
{
if (!m_bRunning || m_bTerminate)
return false;
pause_unchecked();
return true;
}
ThreadNumT get_running_thread_num() const noexcept
{
return m_runningNum;
}
ThreadNumT get_thread_num() const noexcept
{
return m_threadNum;
}
TaskNumT get_task_num() noexcept
{
LockGuardT lock(m_mutex); // 可能与其他改变 m_tasks 的函数同时运行,要加锁
return get_task_num_unchecked();
}
TaskNumT get_task_num() const noexcept
{
return get_task_num_unchecked();
}
const MutexT& get_mutex() const noexcept
{
return m_mutex;
}
const TaskSetT& get_tasks() const noexcept
{
return m_tasks;
}
const ThreadVectorT& get_threads() const noexcept
{
return m_threads;
}
// 判断是否退出
bool is_terminate() const noexcept
{
return m_bTerminate;
}
bool is_running() noexcept
{
LockGuardT lock(m_mutex);
return is_running_unchecked();
}
bool is_running() const noexcept
{
return is_running_unchecked();
}
bool is_tasking() noexcept
{
LockGuardT lock(m_mutex);
return is_tasking_unchecked();
}
bool is_tasking() const noexcept
{
return is_tasking_unchecked();
}
bool is_all_done() noexcept
{
LockGuardT lock(m_mutex);
return is_all_done_unchecked();
}
bool is_all_done() const noexcept
{
return is_all_done_unchecked();
}
// 以下为提供给用户派发任务的函数
template<class _TFunc, class..._TArgs> auto execute(const TimeT& timeoutMs, PriorityT priority, bool bExecuteOnTimeout, _TFunc&& func, _TArgs&&... args) noexcept
// -> ::std::future<decltype(::std::forward<_TFunc>(func)(::std::forward<_TArgs>(args)...))>
// 在 C++14 以下,auto 需要在添加以上内容
{
using _RetType = decltype(::std::forward<_TFunc>(func)(::std::forward<_TArgs>(args)...));
// 推导函数的返回值类型
auto task = ::std::make_shared<PackagedTaskT<_RetType>>(::std::bind(::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...));
// bind 允许函数与其参数绑定,使用该返回值调用函数无需传参(使用 placeholders 是例外)
// package_task 允许将函数与结果异步进行
LockGuardT lock(m_mutex);
m_tasks.insert({ bExecuteOnTimeout ? TimePointT{} : ::std::chrono::steady_clock::now() + timeoutMs, priority, bExecuteOnTimeout, [task]() { (*task)(); } });
// 任务队列中插入新任务,其中
// 超时时间点为 bExecuteOnTimeout ? TimePointT{} : ::std::chrono::steady_clock::now() + timeoutMs,仅在超时后不执行时计算超时时间点
// 优先级为 priority
// 超时是否执行为 bExecuteOnTimeout
// 任务函数为 [task]() { (*task)(); } },一个 lambda 表达式,省去声明函数
m_taskCondition.notify_one();
// 唤醒一个线程来执行任务,也有可能都唤醒过了,但这不重要
return task->get_future();
// 返回储存结果的 future,任务执行完后即可从 future 中获得返回值
}
// 默认在填写超时时间时,超时后不执行
template<class _TFunc, class..._TArgs> auto execute(const TimeT& timeoutMs, PriorityT priority, _TFunc&& func, _TArgs&&... args) noexcept
{
return execute(timeoutMs, priority, false, ::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...);
}
// 超时时间变为超时时间点
template<class _TFunc, class..._TArgs> auto execute(const TimePointT& timeout, PriorityT priority, bool bExecuteOnTimeout, _TFunc&& func, _TArgs&&... args) noexcept
{
using _RetType = decltype(::std::forward<_TFunc>(func)(::std::forward<_TArgs>(args)...));
auto task = ::std::make_shared<PackagedTaskT<_RetType>>(::std::bind(::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...));
LockGuardT lock(m_mutex);
m_tasks.insert({ timeout, priority, bExecuteOnTimeout, [task]() { (*task)(); } });
m_taskCondition.notify_one();
return task->get_future();
}
// 默认在填写超时时间点时,超时后不执行
template<class _TFunc, class..._TArgs> auto execute(const TimePointT& timeout, PriorityT priority, _TFunc&& func, _TArgs&&... args) noexcept
{
return execute(timeout, priority, false, ::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...);
}
// 默认在填写优先级时,超时后执行
template<class _TFunc, class..._TArgs> auto execute(PriorityT priority, _TFunc&& func, _TArgs&&... args) noexcept
{
return execute(TimePointT{}, priority, true, ::std::forward<_TFunc>(func), ::std::forward<_TArgs>(args)...);
}
template<class _TDuration = ::std::chrono::milliseconds> bool wait_for_all_done(const _TDuration& time = _TDuration(-1)) noexcept
// 缺省情况下,认为一直等待
{
UniqueLockT lock(m_mutex);
if (time < _TDuration(0)) // 若为一直等待
return m_waitCondition.wait(lock, [this]() { return m_tasks.empty(); }), true; // 逗号表达式,等待完成后返回 true
else
return !(bool)m_waitCondition.wait_for(lock, time, [this]() { return m_tasks.empty(); }); // 等待一段时间
}
template<class _TTimePoint = ::std::chrono::steady_clock::time_point> bool wait_until_all_done(const _TTimePoint& time = _TTimePoint {}) noexcept
// 缺省情况下,认为一直等待
{
UniqueLockT lock(m_mutex);
if (time <= _TTimePoint{}) // 若为一直等待
return m_waitCondition.wait(lock, [this]() { return m_tasks.empty(); }), true;
else
return !(bool)m_waitCondition.wait_until(lock, time, [this]() { return m_tasks.empty(); }); // 等待至时间点
}
protected:
bool get_task(TaskT& task) noexcept // 调度函数获取任务的函数
{
if (m_bTerminate || !m_bRunning) // 仅在不暂停、不退出时运行,写在这里可以不用 lock
return false;
UniqueLockT lock(m_mutex);
if (m_tasks.empty()) // 若当前没有任务,则进入等待,在派发任务后随机唤醒等待中的一个线程
m_taskCondition.wait(lock, [this]() { return m_bTerminate || !m_bRunning || !m_tasks.empty(); }); // 若因退出或暂停或有任务,都暂停等待
if (m_bTerminate || !m_bRunning) // 若因退出或暂停,则函数失败
return false;
auto ptr = m_tasks.begin(); // 获取任务列表中排列第一的任务
task = ::std::move(const_cast<TaskT&>(*ptr)); // 将任务移动赋值到 task,其中 multiset 中是 const 类型,使用 const_cast 解除
m_tasks.erase(ptr); // 获取后删除
return true;
}
void dispatch_task() noexcept
{
while (!m_bTerminate) // 仅在不退出时运行,设置退出后会退出循环,既销毁线程
{
if (!m_bRunning) // 如果暂停,则进入等待,此时派发任务不会唤醒
{
UniqueLockT lock(m_mutex);
m_pauseCondition.wait(lock);
}
TaskT task;
bool got = get_task(task); // 获取任务
if (got) // 若成功,则执行任务
{
++m_runningNum; // 增加正在运行的线程数
TimePointT expireTime = ::std::chrono::steady_clock::now();
if (task.bExecuteOnTimeout || expireTime <= task.expireTime)
task.func();
--m_runningNum;
if (m_runningNum == 0 && m_tasks.empty() || !m_bRunning) // 若任务已经空,或暂停运行,则唤醒
m_waitCondition.notify_all();
}
}
}
protected:
MutexT m_mutex; // 互斥锁,用于防止多线程同时运行导致数据出错
ConditionVariableT m_taskCondition; // 用于任务的环境变量,在派发任务时唤醒一个
ConditionVariableT m_waitCondition; // 用于等待的环境变量,在任务完成时唤醒所有
ConditionVariableT m_pauseCondition; // 用于暂停的环境变量,在设置运行时唤醒所有
ThreadVectorT m_threads; // 储存线程的向量
TaskSetT m_tasks; // 储存任务的队列
AtomicThreadNumT m_runningNum = 0; // 用于记录正在运行的线程数量的原子变量
ThreadNumT m_threadNum = 0; // 用于记录线程总数的变量
bool m_bTerminate = false; // 用于记录是否退出的变量
bool m_bRunning = false; // 用于记录是否运行的变量
};
测试程序
#include "ThreadPool.h"
#include <iostream>
using namespace std;
void func1(int i)
{
printf("func1 %d\n", i);
}
int func2(int i)
{
return i * i;
}
void func3()
{
this_thread::sleep_for(1ms); // 代替运行的时间
printf("func3 ok\n");
}
int main()
{
ThreadPool pool;
pool.init(1); // 设置线程数量,设置单线程,体现优先级
for (int i = 0; i < 5; ++i)
pool.execute(i, func1, i);
pool.start(); // 开始线程池,若在加入任务前开启,则可能先执行了先加入的任务
pool.wait_for_all_done(); // 等待
printf("\n");
auto res = pool.execute(0, func2, 3); // 体现异步程序
pool.wait_until_all_done(); // 等待异步程序执行完毕,此句可省略,res.get() 会自动等待
printf("result is %d\n\n", res.get()); // 获取结果
pool.exit(); // 退出,为了重设线程数
// 以下程序有随机性,提供可能的结果
pool.init(3); // 设置线程数量,设置 3 线程,体现并发
pool.start();
for (int i = 0; i < 10; ++i)
pool.execute(0, func3);
this_thread::sleep_for(2ms); // 等待两毫秒
pool.pause(); // 暂停
printf("pause here\n");
pool.resume(); // 继续运行
pool.wait_for_all_done(); // 等待运行完毕后退出
pool.exit();
(void)getchar();
return 0;
};
可能的结果
func1 4
func1 3
func1 2
func1 1
func1 0
result is 9
func3 ok
func3 ok
func3 ok
func3 ok
pause here
func3 ok
func3 ok
func3 ok
func3 ok
func3 ok
func3 ok
添加评论
取消回复