/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
* This file is part of the LibreOffice project.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
#include <comphelper/threadpool.hxx>
#include <com/sun/star/uno/Exception.hpp>
#include <config_options.h>
#include <o3tl/safeint.hxx>
#include <sal/config.h>
#include <sal/log.hxx>
#include <salhelper/thread.hxx>
#include <algorithm>
#include <memory>
#include <thread>
#include <chrono>
#include <cstddef>
#include <comphelper/debuggerinfo.hxx>
#include <utility>
#if defined HAVE_VALGRIND_HEADERS
#include <valgrind/memcheck.h>
#endif
#if defined(_WIN32)
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#endif
namespace comphelper {
/** prevent waiting for a task from inside a task */
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
static thread_local bool gbIsWorkerThread;
#endif
// used to group thread-tasks for waiting in waitTillDone()
class ThreadTaskTag
{
std::mutex maMutex;
sal_Int32 mnTasksWorking;
std::condition_variable maTasksComplete;
public:
ThreadTaskTag();
bool isDone();
void waitUntilDone();
void onTaskWorkerDone();
void onTaskPushed();
};
class ThreadPool::ThreadWorker : public salhelper::Thread
{
ThreadPool *mpPool;
public:
explicit ThreadWorker( ThreadPool *pPool ) :
salhelper::Thread("thread-pool"),
mpPool( pPool )
{
}
virtual void execute() override
{
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
gbIsWorkerThread = true;
#endif
std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
while( !mpPool->mbTerminate )
{
std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
if( pTask )
{
std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
mpPool->incBusyWorker();
aGuard.unlock();
pTask->exec();
pTask.reset();
aGuard.lock();
mpPool->decBusyWorker();
pTag->onTaskWorkerDone();
}
}
}
};
ThreadPool::ThreadPool(std::size_t nWorkers)
: mbTerminate(true)
, mnMaxWorkers(nWorkers)
, mnBusyWorkers(0)
{
}
ThreadPool::~ThreadPool()
{
// note: calling shutdown from global variable dtor blocks forever on Win7
// note2: there isn't enough MSVCRT left on exit to call assert() properly
// so these asserts just print something to stderr but exit status is
// still 0, but hopefully they will be more helpful on non-WNT platforms
assert(mbTerminate);
assert(maTasks.empty());
assert(mnBusyWorkers == 0);
}
namespace {
std::shared_ptr< ThreadPool >& GetStaticThreadPool()
{
static std::shared_ptr< ThreadPool > POOL =
[]()
{
const std::size_t nThreads = ThreadPool::getPreferredConcurrency();
return std::make_shared< ThreadPool >( nThreads );
}();
return POOL;
}
}
ThreadPool& ThreadPool::getSharedOptimalPool()
{
return *GetStaticThreadPool();
}
std::size_t ThreadPool::getPreferredConcurrency()
{
static std::size_t ThreadCount = []()
{
const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>(
std::max(std::thread::hardware_concurrency(), 1U));
std::size_t nThreads = nHardThreads;
const char *pEnv = getenv("MAX_CONCURRENCY");
if (pEnv != nullptr)
{
// Override with user/admin preference.
nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10));
}
nThreads = std::min(nHardThreads, nThreads);
return std::max<std::size_t>(nThreads, 1);
}();
return ThreadCount;
}
// Used to order shutdown, and to ensure there are no lingering
// threads after LibreOfficeKit pre-init.
void ThreadPool::shutdown()
{
// if (mbTerminate)
// return;
std::unique_lock< std::mutex > aGuard( maMutex );
shutdownLocked(aGuard);
}
void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
{
if( maWorkers.empty() )
{ // no threads at all -> execute the work in-line
std::unique_ptr<ThreadTask> pTask;
while ( ( pTask = popWorkLocked(aGuard, false) ) )
{
std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
pTask->exec();
pTag->onTaskWorkerDone();
}
}
else
{
while( !maTasks.empty() )
{
maTasksChanged.wait( aGuard );
// In the (unlikely but possible?) case pushTask() gets called meanwhile,
// its notify_one() call is meant to wake a up a thread and process the task.
// But if this code gets woken up instead, it could lead to a deadlock.
// Pass on the notification.
maTasksChanged.notify_one();
}
}
assert( maTasks.empty() );
// coverity[missing_lock] - on purpose
mbTerminate = true;
maTasksChanged.notify_all();
decltype(maWorkers) aWorkers;
std::swap(maWorkers, aWorkers);
aGuard.unlock();
while (!aWorkers.empty())
{
rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
aWorkers.pop_back();
assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
== aWorkers.end());
{
xWorker->join();
xWorker.clear();
}
}
}
void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
{
std::scoped_lock< std::mutex > aGuard( maMutex );
mbTerminate = false;
// Worked on tasks are already removed from maTasks, so include the count of busy workers.
if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
{
maWorkers.push_back( new ThreadWorker( this ) );
maWorkers.back()->launch();
}
pTask->mpTag->onTaskPushed();
maTasks.insert( maTasks.begin(), std::move(pTask) );
maTasksChanged.notify_one();
}
std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
{
do
{
if( !maTasks.empty() )
{
std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
maTasks.pop_back();
return pTask;
}
else if (!bWait || mbTerminate)
return nullptr;
maTasksChanged.wait( rGuard );
} while (!mbTerminate);
return nullptr;
}
void ThreadPool::incBusyWorker()
{
++mnBusyWorkers;
}
void ThreadPool::decBusyWorker()
{
assert(mnBusyWorkers >= 1);
--mnBusyWorkers;
}
void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
{
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
#endif
{
std::unique_lock< std::mutex > aGuard( maMutex );
if( maWorkers.empty() )
{ // no threads at all -> execute the work in-line
while (!rTag->isDone())
{
std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
if (!pTask)
break;
std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
pTask->exec();
pTag->onTaskWorkerDone();
}
}
}
rTag->waitUntilDone();
if (bJoin)
joinThreadsIfIdle();
}
bool ThreadPool::joinThreadsIfIdle()
{
std::unique_lock< std::mutex > aGuard( maMutex );
if (isIdle()) // check if there are still tasks from another tag
{
shutdownLocked(aGuard);
return true;
}
return false;
}
std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
{
return std::make_shared<ThreadTaskTag>();
}
bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
{
return pTag->isDone();
}
ThreadTask::ThreadTask(std::shared_ptr<ThreadTaskTag> xTag)
: mpTag(std::move(xTag))
{
}
void ThreadTask::exec()
{
try {
doWork();
}
catch (const std::exception &e)
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
}
catch (const css::uno::Exception &e)
{
SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
}
catch (...)
{
SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
}
}
ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
{
}
void ThreadTaskTag::onTaskPushed()
{
std::scoped_lock< std::mutex > aGuard( maMutex );
mnTasksWorking++;
assert( mnTasksWorking < 65536 ); // sanity checking
}
void ThreadTaskTag::onTaskWorkerDone()
{
std::scoped_lock< std::mutex > aGuard( maMutex );
mnTasksWorking--;
assert(mnTasksWorking >= 0);
if (mnTasksWorking == 0)
maTasksComplete.notify_all();
}
bool ThreadTaskTag::isDone()
{
std::scoped_lock< std::mutex > aGuard( maMutex );
return mnTasksWorking == 0;
}
void ThreadTaskTag::waitUntilDone()
{
std::unique_lock< std::mutex > aGuard( maMutex );
while( mnTasksWorking > 0 )
{
#if defined DBG_UTIL && !defined NDEBUG
// 10 minute timeout in debug mode, unless the code is built with
// sanitizers or debugged in valgrind or gdb, in which case the threads
// should not time out in the middle of a debugging session
int maxTimeout = 10 * 60;
#if !ENABLE_RUNTIME_OPTIMIZATIONS
maxTimeout = 30 * 60;
#endif
#if defined HAVE_VALGRIND_HEADERS
if( RUNNING_ON_VALGRIND )
maxTimeout = 30 * 60;
#endif
if( isDebuggerAttached())
maxTimeout = 300 * 60;
std::cv_status result = maTasksComplete.wait_for(
aGuard, std::chrono::seconds( maxTimeout ));
assert(result != std::cv_status::timeout);
#else
// 10 minute timeout in production so the app eventually throws some kind of error
if (maTasksComplete.wait_for(
aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
throw std::runtime_error("timeout waiting for threadpool tasks");
#endif
}
}
} // namespace comphelper
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
↑ V1089 Waiting on condition variable without predicate. A thread can wait indefinitely or experience a spurious wakeup. Consider passing a predicate as the second argument.
↑ V1089 Waiting on condition variable without predicate. A thread can wait indefinitely or experience a spurious wakeup. Consider passing a predicate as the second argument.
↑ V1089 Waiting on condition variable without predicate. A thread can wait indefinitely or experience a spurious wakeup. Consider passing a predicate as the third argument.