2005/5/9

     
 

core_condition.cpp

artefaktur
// -*- mode:C++; tab-width:2; c-basic-offset:2; indent-tabs-mode:nil -*- 
//
// Copyright (C) 2000-2005 by Roger Rene Kommer / artefaktur, Kassel, Germany.
// 
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public License (LGPL).
// 
// 
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the 
// License ACDK-FreeLicense document enclosed in the distribution
// for more for more details.
// This file is part of the Artefaktur Component Development Kit:
//                         ACDK
// 
// Please refer to
// - http://www.acdk.de
// - http://www.artefaktur.com
// - http://acdk.sourceforge.net
// for more information.
// 
// $Header: /cvsroot/acdk/acdk/acdk_core/src/acdk/lang/sys/core_condition.cpp,v 1.23 2005/04/19 08:35:22 kommer Exp $

#include <acdk.h>

#include "core_condition.h"

#include "../SystemError.h"
#include "core_tick.h"

#if defined(POSIX_THREADS)
#include <unistd.h>
#include <sys/time.h>
#include <errno.h>
#endif
/FONT>

#include "../InterruptedException.h"

namespace acdk {
namespace lang {
namespace sys {

#if defined (POSIX_THREADS)
#include "core_ptherror.h"
#endif //defined (POSIX_THREADS)

//#define LOCAL_DEBUG
#if defined(LOCAL_DEBUG)
#define DOUT(msg) do { sys::coreout << msg << sys::eofl; } while(false)
#else
/FONT>
#define DOUT(msg) do { } while(false)
#endif
/FONT>

core_condition::core_condition(core_mutex& mutex)
: _mutex(mutex)
#if defined (POSIX_THREADS)
{
  pthread_cond_init(&_condition, 0);
#elif defined (WIN32_THREADS)
  ,_waitersCount(0),
  _waiters_cs(),
  _waitersSemaphore(0),
  _waitersEvent(0),
  _broadcasted(0)
{
  _waitersSemaphore = CreateSemaphore(0, 0, 0x7fffffff, 0);// == INVALID_HANDLE_VALUE
  _waitersEvent = CreateEvent(0, false, false, 0); // == 0
  InitializeCriticalSection(&_waiters_cs);
#endif
/FONT>

}

core_condition::~core_condition()
{
#if defined (POSIX_THREADS)
  acdk_pthread_cond_destroy(&_condition);
#elif defined (WIN32_THREADS)
  DeleteCriticalSection(&_waiters_cs);
  CloseHandle(_waitersSemaphore);
  CloseHandle(_waitersEvent);
#endif
/FONT>
}
  
void 
core_condition::notify()
{
#if defined (POSIX_THREADS)
  acdk_pthread_cond_signal(&_condition);
#elif defined (WIN32_THREADS)
  if (_waitersCount > 0 ) 
  {
    ReleaseSemaphore(_waitersSemaphore, 1, 0); // == TRUE
  }
  else
  {
    DOUT("notify without waiters");
  }
#endif
/FONT>
}

void 
core_condition::notifyAll()
{
#if defined (POSIX_THREADS)
  acdk_pthread_cond_broadcast(&_condition);
#elif defined (WIN32_THREADS)
  if (_waitersCount > 0) 
  {
    _broadcasted = true;
    ReleaseSemaphore(_waitersSemaphore, _waitersCount, 0); //   == TRUE
    DWORD result = WaitForSingleObject(_waitersEvent, INFINITE); //!= 0xFFFFFFFF
    _broadcasted = false;
    if (result != WAIT_OBJECT_0) 
      THROW1(SystemError, "Notifyall failed");
  }
  else
  {
    //DOUT("notifyAll without waiters");
  }
#endif
/FONT>
}

void 
core_condition::wait()
{
#if defined (POSIX_THREADS)
  wait(-1);
#elif defined (WIN32_THREADS)
  wait(-1);
#endif
/FONT>
}

#if defined(ACDK_OS_UNIX)
void
fillAbsTime(struct timespec& abstime, int millis, int nanos = 0)
{
  if (millis < 0)
  {
    abstime.tv_sec = 0;
    abstime.tv_nsec = 0;
    return;
  }
  timeval tv;
  gettimeofday(&tv, 0);
  //123456789 - 10^9
  const long one_billion = 1000000000;
  abstime.tv_sec = tv.tv_sec + (millis / 1000);
  abstime.tv_nsec = (tv.tv_usec * 1000) + ((millis % 1000) * 1000000) + nanos;
  if (abstime.tv_nsec > one_billion) {
    ++abstime.tv_sec;
    abstime.tv_nsec -= one_billion;
  }
  
}
#endif
/FONT>

bool 
core_condition::wait(int milliseconds) // ### todo support also mnsecs
{
#if defined (POSIX_THREADS)
#if defined(ACDK_HAS_PTHREAD_RECURSIVE_MUTEX)

  if (milliseconds == -1) {
    int erg = acdk_pthread_cond_wait(&_condition, &_mutex._mutex); 
    if (erg == EINTR) {
      THROW0(InterruptedException);
    } 
    return true;
  }
  struct timespec abstime;
  fillAbsTime(abstime, milliseconds);
  int erg = acdk_pthread_cond_timedwait(&_condition, &_mutex._mutex, &abstime);
  if (erg == EAGAIN || erg == ETIMEDOUT) {
    return false;
  }
  return true;
#else //defined(ACDK_HAS_PTHREAD_RECURSIVE_MUTEX)
  int count = 0;
  bool reterg = true;
  int perg = 0;
  {
    core_lock_guard<core_fastmutex> lock(_mutex._internalMutex);

    count = _mutex._lockCount;
    _mutex._lockCount = 0;
    _mutex._threadOwner = pthread_self();

  }
  struct timespec abstime;
  fillAbsTime(milliseconds);
  while (true) {
    try {
      if (milliseconds < 0) {
        int erg = acdk_pthread_cond_wait(&_condition, &_mutex._mutex.mutex()); 
        if (erg == EINTR) {
          {
            core_unlock_guard<core_fastmutex> im(_mutex._internalMutex);
          }
          THROW0(InterruptedException);
        } 
      } else {
        int erg = acdk_pthread_cond_timedwait(&_condition, &_mutex._mutex.mutex(), &abstime);
        if (erg == EAGAIN || erg == ETIMEDOUT) {
          reterg = false;
          break;
        } 
        
      }
    } catch (.../*interupted */) {
      acdk_pthread_mutex_unlock(&_mutex.mutex());
      _mutex._lock(count);
      throw;
    }
    break;
  }
  {
    acdk_pthread_mutex_unlock(&_mutex.mutex());  
    _mutex._lock(count);
  }
  return reterg;
#endif //defined(ACDK_HAS_PTHREAD_RECURSIVE_MUTEX)
#elif defined (WIN32_THREADS)
  _waitersCount++;  
  if (milliseconds == -1)
    milliseconds = INFINITE;
  _mutex.unlock();
  DWORD result = WaitForSingleObject( _waitersSemaphore, milliseconds );
  if (result == WAIT_TIMEOUT)
  {
    _mutex.lock();
    _waitersCount--;
    return false;
  } 
  else if (result != WAIT_OBJECT_0)
  {
    _mutex.lock();
    _waitersCount--;
    THROW1(SystemError, "Error in wait()");
  }
    
  EnterCriticalSection(&_waiters_cs);
  _waitersCount--;
  bool last_waiter = false;
  if (_broadcasted == true && _waitersCount == 0)
    last_waiter = true;
  LeaveCriticalSection(&_waiters_cs);

  if (last_waiter == true) 
  {
    result = SetEvent(_waitersEvent);
    if (result == 0) 
    {
      _mutex.lock();
      THROW1(SystemError, "SetEvent() failed"); 
    }
  }
  result = WaitForSingleObject(_mutex.handle(), INFINITE);
  if (result == WAIT_FAILED) {
    THROW1(SystemError, "Inconstent mutex state");
    return false;
  } else if (result == WAIT_ABANDONED ) {
    return false;
  } else if (result != WAIT_OBJECT_0) {
    THROW1(SystemError, "Inconstent mutex state");
    return false;
  } 
  return true;
#else
/FONT>
#error unsupported threads
#endif
/FONT>
}

} // namespace sys 
} //namespace lang 
} // namespace acdk