fastdo  0.6.8
threadtask.hpp
浏览该文件的文档.
1 #pragma once
2 //
3 // threadtask 提供任务和线程池相关功能
4 //
5 
6 namespace winux
7 {
8 
9 template < typename _Ty >
10 class Task;
11 class ThreadPool;
12 
14 struct TaskCtx
15 {
17  {
21  };
22 
27  bool posted;
28  bool aborted;
29 
32 
35 
38  {
39  TaskCtx * start = this;
40  while ( start->prevTask != nullptr ) start = start->prevTask;
41  return start;
42  }
43 
45  bool wait( double sec = -1 )
46  {
47  ScopeGuard guard(mtxTask);
48  return cdtTask.waitUntil( [this] () { return this->status == TaskCtx::taskStop; }, mtxTask, sec );
49  }
50 
52  void updateStatus( TaskStatus st, bool isNotifyAll = false )
53  {
54  ScopeGuard guard(mtxTask);
55  this->status = st;
56  if ( isNotifyAll ) cdtTask.notifyAll(); // 唤醒所有等待此任务的线程
57  }
58 
60  void post();
61 
63  void tryPostNext();
64 protected:
65  TaskCtx() : mtxTask(true), cdtTask(true), status(taskPending), pool(nullptr), posted(false), aborted(false), prevTask(nullptr) { }
66  virtual ~TaskCtx()
67  {
68  //auto start = getStartTask();
69  //cout << "~TaskCtx(" << ( start == this ? "start-task" : "then-task" ) << ") " << this << "\n";
70  }
71 };
72 
73 template < typename _Ty >
74 struct TaskCtxT : public TaskCtx
75 {
76  _Ty val;
77 
78  template < typename... _ArgType >
79  static SharedPointer<TaskCtxT> Create( _ArgType&& ... arg )
80  {
81  SharedPointer<TaskCtxT> p( new TaskCtxT( std::forward<_ArgType>(arg)... ) );
82  p->weakThis = p;
83  return p;
84  }
85 
86  _Ty get()
87  {
88  this->wait();
89  return val;
90  }
91 
92  void exec( RunableInvoker<_Ty> * ivk ) noexcept
93  {
94  try
95  {
96  this->val = ivk->invoke();
97  }
98  catch ( winux::Error const & e )
99  {
100  std::cout << e.what() << std::endl;
101  this->aborted = true;
102  }
103  catch ( ... )
104  {
105  std::cout << "unknown" << std::endl;
106  this->aborted = true;
107  }
108  }
109 protected:
111  {
112  this->pool = pool;
113  this->status = status;
114  }
115 };
116 
117 template <>
118 struct TaskCtxT<void> : public TaskCtx
119 {
120  template < typename... _ArgType >
121  static SharedPointer<TaskCtxT> Create( _ArgType&& ... arg )
122  {
123  SharedPointer<TaskCtxT> p( new TaskCtxT( std::forward<_ArgType>(arg)... ) );
124  p->weakThis = p;
125  return p;
126  }
127 
128  void get()
129  {
130  this->wait();
131  }
132 
133  void exec( RunableInvoker<void> * ivk ) noexcept
134  {
135  try
136  {
137  ivk->invoke();
138  }
139  catch ( winux::Error const & e )
140  {
141  std::cout << e.what() << std::endl;
142  this->aborted = true;
143  }
144  catch ( ... )
145  {
146  std::cout << "unknown" << std::endl;
147  this->aborted = true;
148  }
149  }
150 protected:
152  {
153  this->pool = pool;
154  this->status = status;
155  }
156 };
157 
160 {
161 public:
163  explicit ThreadPool() : _mtxPool(true), _cdtPool(true), _poolStop(false), _taskChainCount(0)
164  {
165  }
166 
170  explicit ThreadPool( int threadCount ) : _mtxPool(true), _cdtPool(true), _poolStop(false), _taskChainCount(0)
171  {
172  this->startup(threadCount);
173  }
174 
175  virtual ~ThreadPool()
176  {
177  this->whenEmptyStopAndWait();
178  }
179 
181  ThreadPool & startup( int threadCount )
182  {
183  _group.create( threadCount, [this] () {
184  while ( !_poolStop )
185  {
186  SharedPointer<TaskCtx> taskCtx;
187  {
188  ScopeGuard guard(_mtxPool);
189  while ( _queueTask.empty() && !_poolStop )
190  {
191  _cdtPool.wait(_mtxPool);
192  }
193  if ( !_queueTask.empty() ) // 如果队列不空
194  {
195  taskCtx = _queueTask.front();
196  _queueTask.pop();
197 
198  _cdtPool.notifyAll(); // 唤醒所有线程池线程
199  }
200  }
201 
202  if ( taskCtx )
203  {
204  taskCtx->updateStatus( TaskCtx::taskRunning, false );
205  taskCtx->routineForPool->run();
206 
207  taskCtx->posted = false;
208  }
209  }
210  } );
211  _group.startup(); // 启动线程组的线程
212  return *this;
213  }
214 
216  template < typename _Fx, typename... _ArgType >
217  Task<typename FuncTraits<_Fx>::ReturnType> task( _Fx fn, _ArgType&&... arg )
218  {
219  return Task<typename FuncTraits<_Fx>::ReturnType>( this, fn, std::forward<_ArgType>(arg)... );
220  }
221 
223  void stop()
224  {
225  ScopeGuard guard(_mtxPool);
226  _poolStop = true;
227  _cdtPool.notifyAll(); // 唤醒所有线程池线程
228  }
229 
231  bool wait( double sec = -1 )
232  {
233  return _group.wait(sec); // 等待全部线程退出
234  }
235 
238  {
239  while ( true )
240  {
241  ScopeGuard guard(_mtxPool);
242  // 判断任务队列为空
243  _cdtPool.waitUntil( [this] () { return _queueTask.empty(); }, _mtxPool );
244  // 判断任务链数量为0
245  if ( this->_taskChainCount <= 0 ) break;
246  }
247 
248  this->stop();
249  this->wait();
250  }
251 
253  size_t getTaskCount() const
254  {
255  ScopeGuard guard( const_cast<Mutex &>(_mtxPool) );
256  return _queueTask.size();
257  }
258 
260  {
261  ScopeGuard guard(_mtxPool);
262  return ++_taskChainCount;
263  }
264 
266  {
267  ScopeGuard guard(_mtxPool);
268  return --_taskChainCount;
269  }
270 
271 private:
272  // 投递一个任务
273  void _postTask( SharedPointer<TaskCtx> taskCtx )
274  {
275  ScopeGuard guard(_mtxPool);
276  _queueTask.push(taskCtx);
277  _cdtPool.notify(); // 通知一个等待中的线程
278  }
279 
280  Mutex _mtxPool; // 互斥锁
281  Condition _cdtPool; // 用于判断队列是否有数据
282  bool _poolStop; // 线程池停止
283  ThreadGroup _group; // 线程组
284  std::queue< SharedPointer<TaskCtx> > _queueTask; // 任务队列
285  int _taskChainCount; // 执行中的任务链数量
286 
287  friend struct TaskCtx;
288 
290 };
291 
292 // partial TaskCtx
293 inline void TaskCtx::post()
294 {
295  if ( !this->weakThis.expired() )
296  {
297  SharedPointer<TaskCtx> p = this->weakThis.lock();
298  p->status = taskPending;
299  this->pool->_postTask(p);
300  }
301 }
302 
303 inline void TaskCtx::tryPostNext()
304 {
305  // 检测是否有下个任务且任务不中止,则投递到线程池
306  if ( this->nextTask && !this->aborted )
307  {
308  this->nextTask->post();
309  }
310  else
311  {
312  this->pool->decTaskChainCount();
313  }
314 }
315 
317 template < typename _Ty >
318 class Task
319 {
320 public:
321  using ReturnType = _Ty;
322 
324  template < typename _Fx, typename... _ArgType >
325  Task( ThreadPool * pool, _Fx fnRoutine, _ArgType&& ... argRoutine )
326  {
327  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
329  //cout << "start-task: " << _taskCtx.get() << endl;
330 
331  auto routine = MakeSimple( NewRunable( fnRoutine, std::forward<_ArgType>(argRoutine)... ) );
332  _taskCtx->routineForPool.attachNew( NewRunable( [routine] ( TaskCtxT<ReturnType> * taskCtx ) {
333  // 执行任务例程
334  taskCtx->exec( routine.get() );
335 
336  // 检测是否有下个任务,投递到线程池
337  taskCtx->tryPostNext();
338 
339  // 更新运行状态并通知唤醒等待在此任务的线程
340  taskCtx->updateStatus( TaskCtx::taskStop, true );
341 
342  }, _taskCtx.get() ) );
343  }
344 
346  template < typename _Fx, typename... _ArgType >
347  Task( SharedPointer< TaskCtxT<void> > prevTaskCtx, _Fx fnRoutine, _ArgType&& ... argRoutine )
348  {
349  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
350  _taskCtx = TaskCtxT<ReturnType>::Create(prevTaskCtx->pool, TaskCtx::taskPending);
351  _taskCtx->prevTask = prevTaskCtx.get();
352  //cout << "then 2-1 " << endl;
353 
354  auto routine = MakeSimple( NewRunable( fnRoutine, std::forward<_ArgType>(argRoutine)... ) );
355  _taskCtx->routineForPool.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<void> > prevTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
356  // 执行任务例程
357  taskCtx->exec( routine.get() );
358 
359  // 后续任务的例程执行完成后没有必要还留有上一个任务的nextTask,必须清空,否则内存泄露。
360  prevTaskCtx->nextTask.reset();
361 
362  // 检测是否有下个任务,投递到线程池
363  taskCtx->tryPostNext();
364 
365  // 更新运行状态并通知唤醒等待在此任务的线程
366  taskCtx->updateStatus( TaskCtx::taskStop, true );
367 
368  }, prevTaskCtx, _taskCtx.get() ) );
369 
370  }
371 
373  template < typename _Fx, typename... _ArgType >
374  Task( SharedPointer< TaskCtxT<void> > prevTaskCtx, _Fx fnRoutine, typename FuncTraits<_Fx>::ClassType * obj, _ArgType&& ... argRoutine )
375  {
376  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
377  _taskCtx = TaskCtxT<ReturnType>::Create(prevTaskCtx->pool, TaskCtx::taskPending);
378  _taskCtx->prevTask = prevTaskCtx.get();
379  //cout << "then 2-2 " << endl;
380 
381  auto routine = MakeSimple( NewRunable( fnRoutine, obj, std::forward<_ArgType>(argRoutine)... ) );
382  _taskCtx->routineForPool.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<void> > prevTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
383  // 执行任务例程
384  taskCtx->exec( routine.get() );
385 
386  // 后续任务的例程执行完成后没有必要还留有上一个任务的nextTask,必须清空,否则内存泄露。
387  prevTaskCtx->nextTask.reset();
388 
389  // 检测是否有下个任务,投递到线程池
390  taskCtx->tryPostNext();
391 
392  // 更新运行状态并通知唤醒等待在此任务的线程
393  taskCtx->updateStatus( TaskCtx::taskStop, true );
394 
395  }, prevTaskCtx, _taskCtx.get() ) );
396 
397  }
398 
400  template < typename _Ty2, typename _Fx, typename... _ArgType >
401  Task( SharedPointer< TaskCtxT<_Ty2> > prevTaskCtx, _Fx fnRoutine, _ArgType&& ... argRoutine )
402  {
403  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
404  _taskCtx = TaskCtxT<ReturnType>::Create(prevTaskCtx->pool, TaskCtx::taskPending);
405  _taskCtx->prevTask = prevTaskCtx.get();
406  //cout << "then 3-1 " << endl;
407 
408  auto routine = MakeSimple( NewRunable( fnRoutine, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
409  _taskCtx->routineForPool.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<_Ty2> > prevTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
410  // 执行任务例程
411  std::get<0>(routine->_tuple) = std::move(prevTaskCtx->val);
412  taskCtx->exec( routine.get() );
413 
414  // 后续任务的例程执行完成后没有必要还留有上一个任务的nextTask,必须清空,否则内存泄露。
415  prevTaskCtx->nextTask.reset();
416 
417  // 检测是否有下个任务,投递到线程池
418  taskCtx->tryPostNext();
419 
420  // 更新运行状态并通知唤醒等待在此任务的线程
421  taskCtx->updateStatus( TaskCtx::taskStop, true );
422 
423  }, prevTaskCtx, _taskCtx.get() ) );
424 
425  }
426 
428  template < typename _Ty2, typename _Fx, typename... _ArgType >
429  Task( SharedPointer< TaskCtxT<_Ty2> > prevTaskCtx, _Fx fnRoutine, typename FuncTraits<_Fx>::ClassType * obj, _ArgType&& ... argRoutine )
430  {
431  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
432  _taskCtx = TaskCtxT<ReturnType>::Create(prevTaskCtx->pool, TaskCtx::taskPending);
433  _taskCtx->prevTask = prevTaskCtx.get();
434  //cout << "then 3-2 " << endl;
435 
436  auto routine = MakeSimple( NewRunable( fnRoutine, obj, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
437  _taskCtx->routineForPool.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<_Ty2> > prevTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
438  // 执行任务例程
439  std::get<1>(routine->_tuple) = std::move(prevTaskCtx->val);
440  taskCtx->exec( routine.get() );
441 
442  // 后续任务的例程执行完成后没有必要还留有上一个任务的nextTask,必须清空,否则内存泄露。
443  prevTaskCtx->nextTask.reset();
444 
445  // 检测是否有下个任务,投递到线程池
446  taskCtx->tryPostNext();
447 
448  // 更新运行状态并通知唤醒等待在此任务的线程
449  taskCtx->updateStatus( TaskCtx::taskStop, true );
450 
451  }, prevTaskCtx, _taskCtx.get() ) );
452 
453  }
454 
455  virtual ~Task()
456  {
457  }
458 
460  template < typename _Fx, typename... _ArgType >
461  Task<typename FuncTraits<_Fx>::ReturnType> then( _Fx fn, _ArgType&& ... arg )
462  {
463  return Task<typename FuncTraits<_Fx>::ReturnType>( _taskCtx, fn, std::forward<_ArgType>(arg)... );
464  }
465 
467  Task & post()
468  {
469  // 寻到起始任务
470  TaskCtx * startTask = _taskCtx->getStartTask();
471  if ( !startTask->posted )
472  {
473  TaskCtx * cur = nullptr;
474 
475  // 修改任务状态为taskPending
476  cur = _taskCtx.get();
477  while ( cur != nullptr )
478  {
479  ScopeGuard guard(cur->mtxTask);
481  cur = cur->prevTask;
482  }
483 
484  // 用prev链构建next链
485  cur = _taskCtx.get();
486  while ( cur->prevTask != nullptr )
487  {
488  auto p = cur;
489  cur = cur->prevTask;
490  cur->nextTask = p->weakThis.lock();
491  }
492 
493  // 投递起始任务
494  startTask->post();
495 
496  startTask->posted = true;
497 
498  // 增加任务链数目
499  this->_taskCtx->pool->incTaskChainCount();
500  }
501 
502  return *this;
503  }
504 
506  void wait( double sec = -1 )
507  {
508  _taskCtx->wait(sec);
509  }
510 
513  {
514  return _taskCtx->get();
515  }
516 
517 private:
519 
520  template < typename _Ty0 >
521  friend class Task;
522 };
523 
524 } // namespace winux
Mutex mtxTask
互斥量保护数据
Definition: threadtask.hpp:23
Task & post()
任务必须投递,否则不会被执行
Definition: threadtask.hpp:467
RunableT< _Fx, std::tuple< typename std::decay< _ArgType >::type... > > * NewRunable(_Fx fn, _ArgType &&...arg)
创建一个Runable对象
Definition: utilities.hpp:119
void stop()
主动停止线程池运行
Definition: threadtask.hpp:223
void whenEmptyStopAndWait()
当任务队列为空,任务链为0,就停止线程池运行,并等待线程组线程正常退出
Definition: threadtask.hpp:237
Condition cdtTask
用于判断运行状态
Definition: threadtask.hpp:24
size_t getTaskCount() const
队列里的任务数
Definition: threadtask.hpp:253
void exec(RunableInvoker< void > *ivk) noexcept
Definition: threadtask.hpp:133
ThreadPool * pool
相关线程池
Definition: threadtask.hpp:26
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
Definition: threadtask.hpp:121
virtual ~TaskCtx()
Definition: threadtask.hpp:66
Task(ThreadPool *pool, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor1 创建一个起始任务,需要提供一个线程池
Definition: threadtask.hpp:325
bool wait(double sec=-1)
wait(sec>0)等待一定的时间长用于等待任务运行。当调用stop()后,wait(sec<0)等待线程组线程全部正常退出 ...
Definition: threadtask.hpp:231
Task< typename FuncTraits< _Fx >::ReturnType > task(_Fx fn, _ArgType &&...arg)
创建一个新任务
Definition: threadtask.hpp:217
SharedPointer< TaskCtx > nextTask
下一个任务,执行完本任务后应该投递到任务池中
Definition: threadtask.hpp:34
任务数据场景
Definition: threadtask.hpp:14
条件变量
Definition: threads.hpp:337
void updateStatus(TaskStatus st, bool isNotifyAll=false)
更新运行状态
Definition: threadtask.hpp:52
#define DISABLE_OBJECT_COPY(clsname)
Definition: utilities.hpp:81
Task< typename FuncTraits< _Fx >::ReturnType > then(_Fx fn, _ArgType &&...arg)
创建一个后续任务
Definition: threadtask.hpp:461
函数特征
Definition: utilities.hpp:8
TaskCtx * prevTask
上一个任务
Definition: threadtask.hpp:33
Task(SharedPointer< TaskCtxT< void > > prevTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor2-2 给一个任务创建一个后续任务 - 类方法执行
Definition: threadtask.hpp:374
互斥量
Definition: threads.hpp:272
bool wait(double sec=-1)
等待任务结束
Definition: threadtask.hpp:45
作用域范围保护
Definition: system.hpp:207
virtual ~ThreadPool()
Definition: threadtask.hpp:175
线程组
Definition: threads.hpp:449
bool aborted
是否中止,任务中止则不投递nextTask
Definition: threadtask.hpp:28
virtual ~Task()
Definition: threadtask.hpp:455
TaskStatus status
运行状态
Definition: threadtask.hpp:25
bool waitUntil(_Predicate pred, Mutex &mutex, double sec=-1)
等待谓词条件达成
Definition: threads.hpp:363
Task(SharedPointer< TaskCtxT< _Ty2 > > prevTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor3-2 给一个任务创建一个后续任务,并把上一个任务返回值移动给后续任务 - 类方法执行 ...
Definition: threadtask.hpp:429
TaskCtx * getStartTask()
获取起始任务
Definition: threadtask.hpp:37
void post()
投入线程池队列中
Definition: threadtask.hpp:293
TaskCtxT(ThreadPool *pool, TaskCtx::TaskStatus status=TaskCtx::taskPending)
Definition: threadtask.hpp:110
线程池,创建一组线程等待着从任务队列中获取任务执行
Definition: threadtask.hpp:159
void tryPostNext()
尝试投递后续任务,如果有的话
Definition: threadtask.hpp:303
Task(SharedPointer< TaskCtxT< void > > prevTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor2-1 给一个任务创建一个后续任务
Definition: threadtask.hpp:347
virtual char const * what() const
Definition: utilities.hpp:515
代表投递到线程池的一个任务,用于等待执行完毕获取返回值或者接着投递下一个任务
Definition: threadtask.hpp:10
ThreadPool()
构造函数0
Definition: threadtask.hpp:163
bool posted
是否已被投递,只有起始任务才能被投递
Definition: threadtask.hpp:27
void exec(RunableInvoker< _Ty > *ivk) noexcept
Definition: threadtask.hpp:92
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
Definition: threadtask.hpp:79
ThreadPool(int threadCount)
构造函数1
Definition: threadtask.hpp:170
简单指针
Definition: smartptr.hpp:235
void wait(double sec=-1)
等待任务执行完毕
Definition: threadtask.hpp:506
错误类
Definition: utilities.hpp:505
SimplePointer< _Ty > MakeSimple(_Ty *newObj)
Definition: smartptr.hpp:911
int notifyAll()
通知所有正在wait()中的线程醒来
WeakPointer< TaskCtx > weakThis
自己的弱引用
Definition: threadtask.hpp:30
TaskCtxT(ThreadPool *pool, TaskCtx::TaskStatus status=TaskCtx::taskPending)
Definition: threadtask.hpp:151
ThreadPool & startup(int threadCount)
启动指定数量的线程
Definition: threadtask.hpp:181
SimplePointer< Runable > routineForPool
投递到线程池的例程
Definition: threadtask.hpp:31
Task(SharedPointer< TaskCtxT< _Ty2 > > prevTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor3-1 给一个任务创建一个后续任务,并把上一个任务返回值移动给后续任务
Definition: threadtask.hpp:401
跨平台基础功能库
Definition: archives.hpp:7