/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This file is part of the LibreOffice project.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 *
 * This file incorporates work covered by the following license notice:
 *
 *   Licensed to the Apache Software Foundation (ASF) under one or more
 *   contributor license agreements. See the NOTICE file distributed
 *   with this work for additional information regarding copyright
 *   ownership. The ASF licenses this file to you under the Apache
 *   License, Version 2.0 (the "License"); you may not use this file
 *   except in compliance with the License. You may obtain a copy of
 *   the License at http://www.apache.org/licenses/LICENSE-2.0 .
 */
 
#include <sal/config.h>
 
#include <cassert>
#include <chrono>
#include <algorithm>
#include <utility>
#include <unordered_map>
 
#include <osl/diagnose.h>
#include <sal/log.hxx>
 
#include <uno/threadpool.h>
 
#include "threadpool.hxx"
#include "thread.hxx"
 
using namespace ::osl;
using namespace ::rtl;
 
namespace cppu_threadpool
{
    WaitingThread::WaitingThread(
        rtl::Reference<ORequestThread> theThread): thread(std::move(theThread))
    {}
 
    DisposedCallerAdminHolder const & DisposedCallerAdmin::getInstance()
    {
        static DisposedCallerAdminHolder theDisposedCallerAdmin =  std::make_shared<DisposedCallerAdmin>();
        return theDisposedCallerAdmin;
    }
 
    DisposedCallerAdmin::~DisposedCallerAdmin()
    {
        SAL_WARN_IF( !m_vector.empty(), "cppu.threadpool", "DisposedCallerList :  " << m_vector.size() << " left");
    }
 
    void DisposedCallerAdmin::dispose( void const * nDisposeId )
    {
        std::scoped_lock guard( m_mutex );
        m_vector.push_back( nDisposeId );
    }
 
    void DisposedCallerAdmin::destroy( void const * nDisposeId )
    {
        std::scoped_lock guard( m_mutex );
        std::erase(m_vector, nDisposeId);
    }
 
    bool DisposedCallerAdmin::isDisposed( void const * nDisposeId )
    {
        std::scoped_lock guard( m_mutex );
        return (std::find(m_vector.begin(), m_vector.end(), nDisposeId) != m_vector.end());
    }
 
 
    ThreadPool::ThreadPool() :
        m_DisposedCallerAdmin( DisposedCallerAdmin::getInstance() )
    {
    }
 
    ThreadPool::~ThreadPool()
    {
        SAL_WARN_IF( m_mapQueue.size(), "cppu.threadpool", "ThreadIdHashMap:  " << m_mapQueue.size() << " left");
    }
 
    void ThreadPool::dispose( void const * nDisposeId )
    {
        m_DisposedCallerAdmin->dispose( nDisposeId );
 
        std::scoped_lock guard( m_mutex );
        for (auto const& item :  m_mapQueue)
        {
            if( item.second.first )
            {
                item.second.first->dispose( nDisposeId );
            }
            if( item.second.second )
            {
                item.second.second->dispose( nDisposeId );
            }
        }
    }
 
    void ThreadPool::destroy( void const * nDisposeId )
    {
        m_DisposedCallerAdmin->destroy( nDisposeId );
    }
 
    /******************
     * This method lets the thread wait a certain amount of time. If within this timespan
     * a new request comes in, this thread is reused. This is done only to improve performance,
     * it is not required for threadpool functionality.
     ******************/
    void ThreadPool::waitInPool( rtl::Reference< ORequestThread > const & pThread )
    {
        WaitingThread waitingThread(pThread);
        {
            std::scoped_lock guard( m_mutexWaitingThreadList );
            m_dequeThreads.push_front( &waitingThread );
        }
 
        // let the thread wait 2 seconds
        waitingThread.condition.wait( std::chrono::seconds(2) );
 
        {
            std::scoped_lock guard ( m_mutexWaitingThreadList );
            if( waitingThread.thread.is() )
            {
                // thread wasn't reused, remove it from the list
                WaitingThreadDeque::iterator ii = find(
                    m_dequeThreads.begin(), m_dequeThreads.end(), &waitingThread );
                OSL_ASSERT( ii != m_dequeThreads.end() );
                m_dequeThreads.erase( ii );
            }
        }
    }
 
    void ThreadPool::joinWorkers()
    {
        {
            std::scoped_lock guard( m_mutexWaitingThreadList );
            for (auto const& thread : m_dequeThreads)
            {
                // wake the threads up
                thread->condition.set();
            }
        }
        m_aThreadAdmin.join();
    }
 
    bool ThreadPool::createThread( JobQueue *pQueue ,
                                   const ByteSequence &aThreadId,
                                   bool bAsynchron )
    {
        {
            // Can a thread be reused ?
            std::scoped_lock guard( m_mutexWaitingThreadList );
            if( ! m_dequeThreads.empty() )
            {
                // inform the thread and let it go
                struct WaitingThread *pWaitingThread = m_dequeThreads.back();
                pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
                pWaitingThread->thread = nullptr;
 
                // remove from list
                m_dequeThreads.pop_back();
 
                // let the thread go
                pWaitingThread->condition.set();
                return true;
            }
        }
 
        rtl::Reference pThread(
            new ORequestThread( this, pQueue , aThreadId, bAsynchron) );
        return pThread->launch();
    }
 
    bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, bool bAsynchron )
    {
        std::scoped_lock guard( m_mutex );
 
        ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
        OSL_ASSERT( ii != m_mapQueue.end() );
 
        if( bAsynchron )
        {
            if( ! (*ii).second.second->isEmpty() )
            {
                // another thread has put something into the queue
                return false;
            }
 
            (*ii).second.second = nullptr;
            if( (*ii).second.first )
            {
                // all oneway request have been processed, now
                // synchronous requests may go on
                (*ii).second.first->resume();
            }
        }
        else
        {
            if( ! (*ii).second.first->isEmpty() )
            {
                // another thread has put something into the queue
                return false;
            }
            (*ii).second.first = nullptr;
        }
 
        if( nullptr == (*ii).second.first && nullptr == (*ii).second.second )
        {
            m_mapQueue.erase( ii );
        }
 
        return true;
    }
 
 
    bool ThreadPool::addJob(
        const ByteSequence &aThreadId ,
        bool bAsynchron,
        void *pThreadSpecificData,
        RequestFun * doRequest,
        void const * disposeId )
    {
        bool bCreateThread = false;
        JobQueue *pQueue = nullptr;
        {
            std::scoped_lock guard( m_mutex );
            if (m_DisposedCallerAdmin->isDisposed(disposeId)) {
                return true;
            }
 
            ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
 
            if( ii == m_mapQueue.end() )
            {
                m_mapQueue[ aThreadId ] = std::pair < JobQueue * , JobQueue * > ( nullptr , nullptr );
                ii = m_mapQueue.find( aThreadId );
                OSL_ASSERT( ii != m_mapQueue.end() );
            }
 
            if( bAsynchron )
            {
                if( ! (*ii).second.second )
                {
                    (*ii).second.second = new JobQueue();
                    bCreateThread = true;
                }
                pQueue = (*ii).second.second;
            }
            else
            {
                if( ! (*ii).second.first )
                {
                    (*ii).second.first = new JobQueue();
                    bCreateThread = true;
                }
                pQueue = (*ii).second.first;
 
                if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
                {
                    pQueue->suspend();
                }
            }
            pQueue->add( pThreadSpecificData , doRequest );
        }
 
        return !bCreateThread || createThread( pQueue , aThreadId , bAsynchron);
    }
 
    void ThreadPool::prepare( const ByteSequence &aThreadId )
    {
        std::scoped_lock guard( m_mutex );
 
        ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
 
        if( ii == m_mapQueue.end() )
        {
            JobQueue *p = new JobQueue();
            m_mapQueue[ aThreadId ] = std::pair< JobQueue * , JobQueue * > ( p , nullptr );
        }
        else if( nullptr == (*ii).second.first )
        {
            (*ii).second.first = new JobQueue();
        }
    }
 
    void * ThreadPool::enter( const ByteSequence & aThreadId , void const * nDisposeId )
    {
        JobQueue *pQueue = nullptr;
        {
            std::scoped_lock guard( m_mutex );
 
            ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
 
            assert(ii != m_mapQueue.end());
            pQueue = (*ii).second.first;
        }
 
        assert(pQueue);
        void *pReturn = pQueue->enter( nDisposeId );
 
        if( pQueue->isCallstackEmpty() )
        {
            if( revokeQueue( aThreadId , false) )
            {
                // remove queue
                delete pQueue;
            }
        }
        return pReturn;
    }
}
 
// All uno_ThreadPool handles in g_pThreadpoolHashSet with overlapping life
// spans share one ThreadPool instance.  When g_pThreadpoolHashSet becomes empty
// (within the last uno_threadpool_destroy) all worker threads spawned by that
// ThreadPool instance are joined (which implies that uno_threadpool_destroy
// must never be called from a worker thread); afterwards, the next call to
// uno_threadpool_create (if any) will lead to a new ThreadPool instance.
 
using namespace cppu_threadpool;
 
namespace {
 
struct uno_ThreadPool_Equal
{
    bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
        {
            return a == b;
        }
};
 
struct uno_ThreadPool_Hash
{
    std::size_t operator () ( const uno_ThreadPool &a  )  const
        {
            return reinterpret_cast<std::size_t>( a );
        }
};
 
}
 
typedef std::unordered_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
 
static ThreadpoolHashSet *g_pThreadpoolHashSet;
 
struct _uno_ThreadPool
{
    sal_Int32 dummy;
};
 
namespace {
 
ThreadPoolHolder getThreadPool( uno_ThreadPool hPool )
{
    MutexGuard guard( Mutex::getGlobalMutex() );
    assert( g_pThreadpoolHashSet != nullptr );
    ThreadpoolHashSet::iterator i( g_pThreadpoolHashSet->find(hPool) );
    assert( i != g_pThreadpoolHashSet->end() );
    return i->second;
}
 
}
 
extern "C" uno_ThreadPool SAL_CALL
uno_threadpool_create() noexcept
{
    MutexGuard guard( Mutex::getGlobalMutex() );
    ThreadPoolHolder p;
    if( ! g_pThreadpoolHashSet )
    {
        g_pThreadpoolHashSet = new ThreadpoolHashSet;
        p = new ThreadPool;
    }
    else
    {
        assert( !g_pThreadpoolHashSet->empty() );
        p = g_pThreadpoolHashSet->begin()->second;
    }
 
    // Just ensure that the handle is unique in the process (via heap)
    uno_ThreadPool h = new struct _uno_ThreadPool;
    g_pThreadpoolHashSet->emplace( h, p );
    return h;
}
 
extern "C" void SAL_CALL
uno_threadpool_attach( uno_ThreadPool hPool ) noexcept
{
    sal_Sequence *pThreadId = nullptr;
    uno_getIdOfCurrentThread( &pThreadId );
    getThreadPool( hPool )->prepare( pThreadId );
    rtl_byte_sequence_release( pThreadId );
    uno_releaseIdFromCurrentThread();
}
 
extern "C" void SAL_CALL
uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) noexcept
{
    sal_Sequence *pThreadId = nullptr;
    uno_getIdOfCurrentThread( &pThreadId );
    *ppJob =
        getThreadPool( hPool )->enter(
            pThreadId,
            hPool );
    rtl_byte_sequence_release( pThreadId );
    uno_releaseIdFromCurrentThread();
}
 
extern "C" void SAL_CALL
uno_threadpool_detach(SAL_UNUSED_PARAMETER uno_ThreadPool) noexcept
{
    // we might do here some tidying up in case a thread called attach but never detach
}
 
extern "C" void SAL_CALL
uno_threadpool_putJob(
    uno_ThreadPool hPool,
    sal_Sequence *pThreadId,
    void *pJob,
    void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
    sal_Bool bIsOneway ) noexcept
{
    if (!getThreadPool(hPool)->addJob( pThreadId, bIsOneway, pJob ,doRequest, hPool ))
    {
        SAL_WARN(
            "cppu.threadpool",
            "uno_threadpool_putJob in parallel with uno_threadpool_destroy");
    }
}
 
extern "C" void SAL_CALL
uno_threadpool_dispose( uno_ThreadPool hPool ) noexcept
{
    getThreadPool(hPool)->dispose(
        hPool );
}
 
extern "C" void SAL_CALL
uno_threadpool_destroy( uno_ThreadPool hPool ) noexcept
{
    ThreadPoolHolder p( getThreadPool(hPool) );
    p->destroy(
        hPool );
 
    bool empty;
    {
        assert(g_pThreadpoolHashSet);
 
        MutexGuard guard( Mutex::getGlobalMutex() );
 
        ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
        OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
        g_pThreadpoolHashSet->erase( ii );
        delete hPool;
 
        empty = g_pThreadpoolHashSet->empty();
        if( empty )
        {
            delete g_pThreadpoolHashSet;
            g_pThreadpoolHashSet = nullptr;
        }
    }
 
    if( empty )
    {
        p->joinWorkers();
    }
}
 
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */

V509 The 'new' operator is used in the noexcept 'uno_threadpool_create' function. It should be located inside the try..catch block, as it could potentially generate an exception.

V509 The 'new' operator is used in the noexcept 'uno_threadpool_create' function. It should be located inside the try..catch block, as it could potentially generate an exception.

V509 The 'new' operator is used in the noexcept 'uno_threadpool_create' function. It should be located inside the try..catch block, as it could potentially generate an exception.