19 #include <external/inplace_function.h> 22 #include <containers/Vector.hpp> 23 #include <debug/MinimalCrash.hpp> 24 #include <ios/MinimalLogger.hpp> 25 #include <threads/Barrier.hpp> 26 #include <threads/Mutex.hpp> 27 #include <threads/Thread.hpp> 37 static constexpr
int MAX_POOL_FUNCTION_SIZE=
41 bool isWaitingForWork{
false};
47 const pthread_t masterThreadTag{getThreadTag()};
51 stdext::inplace_function<
void(
int),MAX_POOL_FUNCTION_SIZE>;
69 static pthread_t getThreadTag()
85 Tuple<ThreadPool*,
int>;
96 static void* threadPoolSwim(
void* _ptr)
99 ThreadPool::ThreadPars* ptr=
100 static_cast<ThreadPool::ThreadPars*>(_ptr);
103 ThreadPool::ThreadPars& pars=
122 pool.tellTheMasterThreadIsCreated(threadId);
126 pool.waitForWorkToBeAssigned(threadId);
137 pool.tellTheMasterWorkIsFinished(threadId);
148 void fill(
const pthread_attr_t* attr=nullptr);
151 void doNotworkAnymore()
175 for(
int threadId=1;threadId<nThreads;threadId++)
177 if(pthread_join(pool[threadId],nullptr)!=0)
190 void assertPoolOnly(
const int& threadId)
198 void assertMasterOnly(
const int& threadId)
202 MINIMAL_CRASH(
"Only master thread is allowed, but thread",threadId
,"is trying to act");
210 const pthread_t threadTag=
215 pool.findFirst(threadTag);
218 if(threadId==nActiveThreads())
220 fprintf(stdout,
"%d %d\n",threadId,nActiveThreads());
222 fprintf(stdout,
"%d\n",(
int)p);
239 #define THREADS_SCOPE_SEQUENTIAL() 240 ScopeMutexLocker sequentializer ## __LINE__ (mutex); 255 bool isMasterThread()
259 getThreadTag()==masterThreadTag;
271 static constexpr
char workAssignmentTag[]=
272 "WorkAssOrNoMoreWork";
275 static constexpr
auto& workNoMoreTag=
279 void tellThePoolWorkIsAssigned(
const int& threadId)
281 assertMasterOnly(threadId);
283 minimalLogger(runLog,
"Telling the pool that work has been assigned (tag: %s)",workAssignmentTag);
290 barrier.sync(workAssignmentTag,threadId);
294 static constexpr
char threadHasBeenCreated[]=
295 "ThreadHasBeenCreated";
298 void tellTheMasterThreadIsCreated(
const int& threadId)
300 assertPoolOnly(threadId);
302 minimalLogger(runLog,
"Telling that thread has been created and is ready to swim (tag: %s)",threadHasBeenCreated);
305 barrier.sync(threadHasBeenCreated,threadId);
309 void waitPoolToBeFilled(
const int& threadId)
311 assertMasterOnly(threadId);
313 minimalLogger(runLog,
"waiting for threads in the pool to be ready to ready to swim (tag: %s)",threadHasBeenCreated);
316 barrier.sync(threadHasBeenCreated,threadId);
320 void waitForWorkToBeAssigned(
const int& threadId)
322 assertPoolOnly(threadId);
329 barrier.sync(workAssignmentTag,threadId);
333 void tellThePoolNotToWorkAnyLonger(
const int& threadId)
335 assertMasterOnly(threadId);
337 if(
not isWaitingForWork)
340 minimalLogger(runLog,
"Telling the pool not to work any longer (tag: %s)",workNoMoreTag);
348 barrier.sync(workNoMoreTag,threadId);
352 static constexpr
char workFinishedTag[]=
356 void tellTheMasterWorkIsFinished(
const int& threadId)
358 assertPoolOnly(threadId);
360 minimalLogger(runLog,
"finished working (tag: %s)",workFinishedTag);
362 barrier.sync(workFinishedTag,threadId);
366 void waitForPoolToFinishAssignedWork(
const int& threadId)
368 assertMasterOnly(threadId);
370 if constexpr(DEBUG_THREADS)
375 minimalLogger(runLog,
"waiting for pool to finish the work (tag: %s)",workFinishedTag);
381 barrier.sync(workFinishedTag,threadId);
389 const bool& getIfWaitingForWork()
const 399 template <
typename F>
403 if(
not isWaitingForWork)
420 template <
typename Size,
422 void loopSplit(
const Size& beg,
426 workOn([beg,end,nPieces=
this->nActiveThreads(),&f](
const int& threadId)
429 const Size threadLoad=
430 (end-beg+nPieces-1)/nPieces;
433 const Size threadBeg=
437 const Size threadEnd=
438 std::min(end,threadBeg+threadLoad);
440 for(Size i=threadBeg;i<threadEnd;i++)
446 ThreadPool(
int nThreads=std::thread::hardware_concurrency()) :
447 pool(1,getThreadTag()),
471 void assertPoolOnly(
const int& threadId)
477 void assertMasterOnly(
const int& threadId)
501 bool isMasterThread()
518 bool getIfWaitingForWork()
529 template <
typename F>
536 template <
typename Size,
538 void loopSplit(
const Size& beg,
542 for(Size i=beg;i<end;i++)
547 ThreadPool(
int nThreads=1)
#define MINIMAL_CRASH(...)
Initialize the minimal crasher.
void minimalCrash(const char *path, const int line, const char *funcName, const char *format,...)
void minimalLogger(Logger &logger, const char *format,...)
#define THREADS_SCOPE_SEQUENTIAL()
Dummy set a scope mutex locker.
void divWithMod(Vector< TOut > "ient, Vector< TOut > &remainder, const Vector &divisor) const
Returns the result and remainder of the division.
#define MINIMAL_CRASH_STDLIBERR(STRING)
Minimal crash with stdlib error.
constexpr int masterThreadId
Thread id of master thread.
Logger runLog("/dev/stdout")
Global logger.
ThreadPool threads
Global thread pool.