SUNphi  1.0
Pool.hpp
Go to the documentation of this file.
1 #ifndef _POOL_HPP
2 #define _POOL_HPP
3 
4 /// \file Pool.hpp
5 ///
6 /// \brief Provides a thread pool
7 ///
8 /// The thread pool is implemented through \c pthread library, so we
9 /// can start and stop the threads in every moment of the execution,
10 /// at variance with the static-behavior of \c Nissa library, where
11 /// the program would have opened a \c OpenMP parallel statement and
12 /// jumped inside it.
13 ///
14 
15 #ifdef HAVE_CONFIG_H
16  #include "config.hpp"
17 #endif
18 
19 #include <external/inplace_function.h>
20 
21 #include <Tuple.hpp>
22 #include <containers/Vector.hpp>
23 #include <debug/MinimalCrash.hpp>
24 #include <ios/MinimalLogger.hpp>
25 #include <threads/Barrier.hpp>
26 #include <threads/Mutex.hpp>
27 #include <threads/Thread.hpp>
28 
29 namespace SUNphi
30 {
31 #ifdef USE_THREADS
32 
33  /// Contains a thread pool
34  class ThreadPool
35  {
36  /// Maximal size of the stack used for thw work
37  static constexpr int MAX_POOL_FUNCTION_SIZE=
38  128;
39 
40  /// States if the pool is waiting for work
41  bool isWaitingForWork{false};
42 
43  /// States if the pool is filled
44  bool isFilled{false};
45 
46  /// Thread id of master thread
47  const pthread_t masterThreadTag{getThreadTag()};
48 
49  /// Type to encapsulate the work to be done
50  using Work=
51  stdext::inplace_function<void(int),MAX_POOL_FUNCTION_SIZE>;
52 
53  /// Work to be done in the pool
54  ///
55  /// This incapsulates a function returning void, and getting an
56  /// integer as an argument, corresponding to the thread
57  Work work;
58 
59  /// Incapsulate the threads
60  ///
61  /// At the beginning, the pool contains only the main thread, with
62  /// its id. Then when is filled, the pool contains the thread
63  /// identifier. This is an opaque number, which cannot serve the
64  /// purpose of getting the thread progressive in the pool. This is
65  /// why we define the next function
66  Vector<pthread_t> pool;
67 
68  /// Return the thread tag
69  static pthread_t getThreadTag()
70  {
71  return
72  pthread_self();
73  return
74  0;
75  }
76 
77  /// Number of threads
78  int nThreads;
79 
80  /// Barrier used by the threads
81  Barrier barrier;
82 
83  /// Pair of parameters containing the threadpool and the thread id
84  using ThreadPars=
85  Tuple<ThreadPool*,int>;
86 
87  /// Function called when starting a thread
88  ///
89  /// When called, get the thread pool and the thread id as
90  /// arguments through the function parameter. This is expcted to
91  /// be allocated outside through a \c new call, so it is deleted
92  /// after taking reference to the pool, and checking the thread thread.
93  ///
94  /// All threads but the master one swim in this pool back and forth,
95  /// waiting for job to be done.
96  static void* threadPoolSwim(void* _ptr) ///< Initialization data
97  {
98  /// Cast the \c void pointer to the tuple
99  ThreadPool::ThreadPars* ptr=
100  static_cast<ThreadPool::ThreadPars*>(_ptr);
101 
102  /// Takes a reference to the parameters
103  ThreadPool::ThreadPars& pars=
104  *ptr;
105 
106  /// Takes a reference to the pool
107  ThreadPool& pool=
108  *get<0>(pars);
109 
110  /// Copy the thread id
111  int threadId=
112  get<1>(pars);
113 
114  delete ptr;
115 
116  //runLog<<"entering the pool";
117 
118  /// Work until asked to empty
119  bool keepSwimming=
120  pool.isFilled;
121 
122  pool.tellTheMasterThreadIsCreated(threadId);
123 
124  while(keepSwimming)
125  {
126  pool.waitForWorkToBeAssigned(threadId);
127 
128  keepSwimming=
129  pool.isFilled;
130 
131  minimalLogger(runLog," keep swimming: %d",keepSwimming);
132 
133  if(keepSwimming)
134  {
135  pool.work(threadId);
136 
137  pool.tellTheMasterWorkIsFinished(threadId);
138  }
139  }
140 
141  minimalLogger(runLog,"exiting the pool");
142 
143  return
144  nullptr;
145  }
146 
147  /// Fill the pool with the number of thread assigned
148  void fill(const pthread_attr_t* attr=nullptr); ///< Possible attributes of the threads
149 
150  /// Stop the pool
151  void doNotworkAnymore()
152  {
153  // Mark that the pool is not waiting any more for work
154  isWaitingForWork=
155  false;
156 
157  // Temporary name to force the other threads go out of the pool
158  barrier.sync();
159  }
160 
161  /// Empty the thread pool
162  void empty()
163  {
164  // Check that the pool is not empty
165  if(not isFilled)
166  MINIMAL_CRASH("Cannot empty an empty pool!");
167 
168  // Mark that the pool is not filled
169  isFilled=
170  false;
171 
172  /// Stop the pool from working
173  tellThePoolNotToWorkAnyLonger(masterThreadId);
174 
175  for(int threadId=1;threadId<nThreads;threadId++)
176  {
177  if(pthread_join(pool[threadId],nullptr)!=0)
178  MINIMAL_CRASH_STDLIBERR("joining threads");
179 
180  minimalLogger(runLog,"Thread of id %d destroyed",(int)threadId);
181  }
182 
183  // Resize down the pool
184  pool.resize(1);
185  }
186 
187  public:
188 
189  /// Assert that only the pool is accessing
190  void assertPoolOnly(const int& threadId) ///< Calling thread
191  const
192  {
193  if(threadId==masterThreadId)
194  MINIMAL_CRASH("Only pool threads are allowed");
195  }
196 
197  /// Assert that only the master thread is accessing
198  void assertMasterOnly(const int& threadId) ///< Calling thread
199  const
200  {
201  if(threadId!=masterThreadId)
202  MINIMAL_CRASH("Only master thread is allowed, but thread",threadId,"is trying to act");
203  }
204 
205  /// Get the thread of the current thread
206  int getThreadId()
207  const
208  {
209  /// Current pthread
210  const pthread_t threadTag=
211  getThreadTag();
212 
213  /// Position in the pool
214  int threadId=
215  pool.findFirst(threadTag);
216 
217  // Check that the thread is found
218  if(threadId==nActiveThreads())
219  {
220  fprintf(stdout,"%d %d\n",threadId,nActiveThreads());
221  for(auto & p : pool)
222  fprintf(stdout,"%d\n",(int)p);
223  MINIMAL_CRASH("Unable to find thread with tag %d",threadTag);
224  }
225 
226  return
227  threadId;
228  }
229 
230 
231  private:
232 
233  /// Global mutex
234  Mutex mutex;
235 
236  public:
237 
238  /// Puts a scope mutex locker making the scope sequential
239 #define THREADS_SCOPE_SEQUENTIAL()
240  ScopeMutexLocker sequentializer ## __LINE__ (mutex);
241 
242  /// Lock the internal mutex
243  void mutexLock()
244  {
245  mutex.lock();
246  }
247 
248  /// Unlock the mutex
249  void mutexUnlock()
250  {
251  mutex.unlock();
252  }
253 
254  /// Compares the thread tag with the master one
255  bool isMasterThread()
256  const
257  {
258  return
259  getThreadTag()==masterThreadTag;
260  }
261 
262  /// Gets the number of allocated threads
263  int nActiveThreads()
264  const
265  {
266  return
267  pool.size();
268  }
269 
270  /// Tag to mark that assignment has been finished
271  static constexpr char workAssignmentTag[]=
272  "WorkAssOrNoMoreWork";
273 
274  /// Tag to mark that no more work has to do
275  static constexpr auto& workNoMoreTag=
276  workAssignmentTag;
277 
278  /// Start the work for the other threads
279  void tellThePoolWorkIsAssigned(const int& threadId) ///< Thread id
280  {
281  assertMasterOnly(threadId);
282 
283  minimalLogger(runLog,"Telling the pool that work has been assigned (tag: %s)",workAssignmentTag);
284 
285  // Mark down that the pool is not waiting for work
286  isWaitingForWork=
287  false;
288 
289  // The master signals to the pool to start work by synchronizing with it
290  barrier.sync(workAssignmentTag,threadId);
291  }
292 
293  /// Tag to mark that the thread is ready to swim
294  static constexpr char threadHasBeenCreated[]=
295  "ThreadHasBeenCreated";
296 
297  /// Tell the master that the thread is created and ready to swim
298  void tellTheMasterThreadIsCreated(const int& threadId) ///< Thread id
299  {
300  assertPoolOnly(threadId);
301 
302  minimalLogger(runLog,"Telling that thread has been created and is ready to swim (tag: %s)",threadHasBeenCreated);
303 
304  // The thread signals to the master that has been created and ready to swim
305  barrier.sync(threadHasBeenCreated,threadId);
306  }
307 
308  /// Waiting for threads are created and ready to swim
309  void waitPoolToBeFilled(const int& threadId) ///< Thread id
310  {
311  assertMasterOnly(threadId);
312 
313  minimalLogger(runLog,"waiting for threads in the pool to be ready to ready to swim (tag: %s)",threadHasBeenCreated);
314 
315  // The master wait that the threads have been created by syncing with them
316  barrier.sync(threadHasBeenCreated,threadId);
317  }
318 
319  /// Waiting for work to be done means to synchronize with the master
320  void waitForWorkToBeAssigned(const int& threadId) ///< Thread id
321  {
322  assertPoolOnly(threadId);
323 
324  // This printing is messing up, because is occurring in the pool
325  // where the thread is expected to be already waiting for work,
326  // and is not locking the logger correctly
327  //minimalLogger(runLog,"waiting in the pool for work to be assigned (tag %s)",workAssignmentTag);
328 
329  barrier.sync(workAssignmentTag,threadId);
330  }
331 
332  /// Stop the pool from working
333  void tellThePoolNotToWorkAnyLonger(const int& threadId) ///< Thread id
334  {
335  assertMasterOnly(threadId);
336 
337  if(not isWaitingForWork)
338  MINIMAL_CRASH("We cannot stop a working pool");
339 
340  minimalLogger(runLog,"Telling the pool not to work any longer (tag: %s)",workNoMoreTag);
341 
342  // Mark down that the pool is waiting for work
343  isWaitingForWork=
344  false;
345 
346  // The master signals to the pool that he is waiting for the
347  // pool to finish the work
348  barrier.sync(workNoMoreTag,threadId);
349  }
350 
351  /// Tag to mark that the work is finished
352  static constexpr char workFinishedTag[]=
353  "WorkFinished";
354 
355  /// Waiting for work to be done means to synchronize with the master
356  void tellTheMasterWorkIsFinished(const int& threadId) ///< Thread id
357  {
358  assertPoolOnly(threadId);
359 
360  minimalLogger(runLog,"finished working (tag: %s)",workFinishedTag);
361 
362  barrier.sync(workFinishedTag,threadId);
363  }
364 
365  /// Wait that the work assigned to the pool is finished
366  void waitForPoolToFinishAssignedWork(const int& threadId) ///< Thread id
367  {
368  assertMasterOnly(threadId);
369 
370  if constexpr(DEBUG_THREADS)
371  {
372  /// Makes the print sequential across threads
374 
375  minimalLogger(runLog,"waiting for pool to finish the work (tag: %s)",workFinishedTag);
376  mutexUnlock();
377  }
378 
379  // The master signals to the pool that he is waiting for the
380  // pool to finish the work
381  barrier.sync(workFinishedTag,threadId);
382 
383  // Mark down that the pool is waiting for work
384  isWaitingForWork=
385  true;
386  }
387 
388  /// Return whether the pool is waiting for work
389  const bool& getIfWaitingForWork() const
390  {
391  return
392  isWaitingForWork;
393  }
394 
395  /// Gives to all threads some work to be done
396  ///
397  /// The object \c f must be callable, returning void and getting
398  /// an integer as a parameter, representing the thread id
399  template <typename F>
400  void workOn(F f) ///< Function embedding the work
401  {
402  // Check that the pool is waiting for work
403  if(not isWaitingForWork)
404  MINIMAL_CRASH("Trying to give work to not-waiting pool!");
405 
406  // Store the work
407  work=
408  f;
409 
410  // Set off the other threads
411  tellThePoolWorkIsAssigned(masterThreadId);
412 
413  work(0);
414 
415  // Wait that the pool finishes the work
416  waitForPoolToFinishAssignedWork(masterThreadId);
417  }
418 
419  /// Split a loop into \c nTrheads chunks, giving each chunk as a work for a corresponding thread
420  template <typename Size, // Type for the range of the loop
421  typename F> // Type for the work function
422  void loopSplit(const Size& beg, ///< Beginning of the loop
423  const Size& end, ///< End of the loop
424  F f) ///< Function to be called, accepting two integers: the first is the thread id, the second the loop argument
425  {
426  workOn([beg,end,nPieces=this->nActiveThreads(),&f](const int& threadId)
427  {
428  /// Workload for each thread, taking into account the remainder
429  const Size threadLoad=
430  (end-beg+nPieces-1)/nPieces;
431 
432  /// Beginning of the chunk
433  const Size threadBeg=
434  threadLoad*threadId;
435 
436  /// End of the chunk
437  const Size threadEnd=
438  std::min(end,threadBeg+threadLoad);
439 
440  for(Size i=threadBeg;i<threadEnd;i++)
441  f(threadId,i);
442  });
443  }
444 
445  /// Constructor starting the thread pool with a given number of threads
446  ThreadPool(int nThreads=std::thread::hardware_concurrency()) :
447  pool(1,getThreadTag()),
448  nThreads(nThreads),
449  barrier(nThreads)
450  {
451  fill();
452  }
453 
454  /// Destructor emptying the pool
455  ~ThreadPool()
456  {
457  minimalLogger(runLog,"Destroying the pool");
458  empty();
459  }
460  };
461 
462 #else
463 
464  /// Dummy thread pool
465  class ThreadPool
466  {
467 
468  public:
469 
470  /// Assert that only the pool is accessing
471  void assertPoolOnly(const int& threadId) ///< Calling thread
472  const
473  {
474  }
475 
476  /// Assert that only the master thread is accessing
477  void assertMasterOnly(const int& threadId) ///< Calling thread
478  const
479  {
480  }
481 
482  /// Get the thread of the current thread
483  int getThreadId()
484  const
485  {
486  return
487  0;
488  }
489 
490  /// Lock the internal mutex
491  void mutexLock()
492  {
493  }
494 
495  /// Unlock the mutex
496  void mutexUnlock()
497  {
498  }
499 
500  /// Compares the thread tag with the master one
501  bool isMasterThread()
502  const
503  {
504  return
505  true;
506  }
507 
508  /// Gets the number of allocated threads
509  int nActiveThreads()
510  const
511  {
512  return
513  1;
514  }
515 
516 
517  /// Return whether the pool is waiting for work
518  bool getIfWaitingForWork()
519  const
520  {
521  return
522  false;
523  }
524 
525  /// Gives to all threads some work to be done
526  ///
527  /// The object \c f must be callable, returning void and getting
528  /// an integer as a parameter, representing the thread id
529  template <typename F>
530  void workOn(F f) ///< Function embedding the work
531  {
532  f(0);
533  }
534 
535  /// Perform a loop
536  template <typename Size, // Type for the range of the loop
537  typename F> // Type for the work function
538  void loopSplit(const Size& beg, ///< Beginning of the loop
539  const Size& end, ///< End of the loop
540  F f) ///< Function to be called, accepting two integers: the first is the thread id, which will always be 0, the second the loop argument
541  {
542  for(Size i=beg;i<end;i++)
543  f(0,i);
544  }
545 
546  /// Dummy constructor
547  ThreadPool(int nThreads=1)
548  {
549  }
550  };
551 
552 #endif
553 
554  /// Global thread pool
555  extern ThreadPool threads;
556 }
557 
558 #endif
#define MINIMAL_CRASH(...)
Initialize the minimal crasher.
void minimalCrash(const char *path, const int line, const char *funcName, const char *format,...)
Definition: SUNphi.cpp:53
void minimalLogger(Logger &logger, const char *format,...)
Definition: SUNphi.cpp:40
#define THREADS_SCOPE_SEQUENTIAL()
Dummy set a scope mutex locker.
Definition: Mutex.hpp:83
void divWithMod(Vector< TOut > &quotient, Vector< TOut > &remainder, const Vector &divisor) const
Returns the result and remainder of the division.
Definition: Vector.hpp:310
#define MINIMAL_CRASH_STDLIBERR(STRING)
Minimal crash with stdlib error.
constexpr int masterThreadId
Thread id of master thread.
Definition: Thread.hpp:24
Logger runLog("/dev/stdout")
Global logger.
ThreadPool threads
Global thread pool.
Definition: SUNphi.cpp:215