Win32 System Programming
|
Multiple Wait SemaphorePage 235 in the book mentioned that you could atomically wait for multiple semaphore units by using a mutex to guard the semaphore. This is correct, but limited. I call this the "First Come, First Served" multiple wait semantics as a large request will block all subsequent requests (regardless of thread priority or request size) until the initial request is satisfied. This might be the semantics you require, but, you might actually require "First Satisfiable" semantics where a small request will be satisfied before a large request. This small, integrated collection of functions implements the required routines. Here are some features:
How would you use such a capability? Chapter 11's server showed one example, where different threads require different amounts of a limited serving capability. Here is another example, which is fairly realistic. Suppose you have a "Monitor" thread that is not to start (or get past a certain point) until ALL worker threads are running or have reached some specified point. Here is what to do, using a "First Satisfiable" multiple wait semaphore:
Here is an exercise involving the above idea and an extra event together with a "First Come, First Served" semaphore. Construct a "thread rendezvous" where all worker threads block at a certain point and they are not released until the monitor finds that some number of them have blocked at the rendezvous. Can you construct a rendezvous object that does not require the monitor thread? Here is another exercise. Construct a "reverse semaphore" object that is signaled if and only if the count is equal to the maximum value. A waiting thread that is released decreases the semaphore by the count specified in the wait and the reverse semaphore becomes unsignaled. You can do this with some minor modifications to the functions here. See http://www.microsoft.com/msj/0797/win320797top.htm for a different solution, although, in that article, the semaphore is signaled only when it is 0, and there is no multiple wait. The implementation has some other interesting features:
COMMENTS ON TESTING, DEBUGGING, AND VALIDATING Threaded, synchronized systems present significant challenges not only in the design and implementation, but also in all stages of quality assurance. There are many traps for the unwary. However, writing good code need not be that difficult, and it can be fun and rewarding. Here are some hints as well as some personal opinions. Some of the personal opinions are not widely shared.
I have seen some on-line discussion that emphasizes this point. Under the debugger, PulseEvent had a different effect than without the debugger. Apparently, when pulsing a manual reset event, the signal could be lost under certain circumstances when using the debugger. I plan to add some material regarding informal proofs of correctness for dealing with synchronized threads. Now that I have said that, I fully expect to be humbled when someone points out an obvious (or not so obvious) defect in my code. Please take the challenge to do so. If you succeed, I will still be grateful and will make the excuse that I was in a hurry (see the last bullet) even though I did perform all the recommended steps. This is the function library. /* SynchObj.c
Copyright (c) 1998, Johnson M. Hart
Permission is granted for any and all use providing that this copyright is
properly acknowledged.
There are no assurances of suitability for any use whatsoever.
Library of synchronization "objects" to supplement the standard Win32
events and mutexes.
For simplicity, this implementation shares a header file (synchize.h) with
another library of synchronization objects (EvMxSem.c) that is designed
to operate under Windows CE, which does not have semaphores.
Implementation notes:
1. All required internal data structures are allocated on the process's heap
2. Where appropriate, a new error code is returned (see the header
file), or, if the error is a Win32 error, that code is unchanged.
3. Notice the new handle type, "SYNCHHANDLE" which has handles, counters,
and other information. This structure will grow as new objects are added
to this set; some members are specific to only one or two of the objects;
in particular, the structure is more general than is required just for
some of the specific examples.
4. Mutexes are used for critical sections. These could be replaced with
CRITICAL_SECTION objects if the objects are used totally within
a process. Even better, if the object is not named, then you
know that it will be used only within the process and you could
make the decision at create time. HOWEVER, you will lose the timeout
option if you do so, AND you will also lose the naming and interprocess
capabilities.
5. These simulated semaphores can be named and hence can be shared
between processes.
6. No Win32 semaphores are used, so this implementation (with null names)
will operate under Windows CE.
7. The implementation shows several interesting aspect of synchronization, some
of which are specific to Win32 and some of which are general. There are pointed
out in the comments as appropriate.
8. These objects have a WaitForSingleObject equivalent. There is, however, no
equivalent to WaitForMultipleObjects as this is very difficult, if not impossible
to do efficiently outside of the kernel.
*/
#include "EvryThng.h"
#include "Synchize.h"
static SYNCHHANDLE CleanUp (SYNCHHANDLE);
static SYNCHHANDLE AllocSynchHandle (LPCTSTR, LPTSTR, LPTSTR, LPTSTR, LPBOOL);
/*********************************************
MULTIPLE WAIT SEMAPHORE function family.
These semaphores allow a thread to wait atomically for several "units"
rather than just one. The DEFAULT wait semantics are considered to be
"First Satisfiable". That is, a thread with a satisfiable request
will be released even if a thread (regardless of priority) has an
outstanding request that is not currently satisfiable.
Optional semantics, specified at create time, are
"First Come, First Served". A thread with request larger than the
number of units currently available will block all subsequent
threads, even those with satisfiable requests or those of
higher priority. The solution on p. 235 is a First Come, First Served
solution, but here we implement it within the general framework.
*/
SYNCHHANDLE CreateSemaphoreMW (
LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, /* pointer to security attributes */
LONG lInitialCount, /* initial count */
LONG lMaximumCount, /* maximum count */
BOOL fFirstComeFirstServed, /* First Satisfiable is the default */
LPCTSTR lpName )
/* Multiple wait semaphore creation.
Requires a counter, a mutex to protect the semaphore state, and an
autoreset event.
Here are the rules that must always hold between the autoreset event
and the mutex (any violation of these rules by the multiple wait semaphore
functions will, in all likelihood, result in a defect):
1. No thread can set, pulse, or reset the event,
nor can it access any part of the SYNCHHANDLE structure,
without first gaining ownership of the mutex.
BUT, a thread can wait on the event without owning the mutex
(this is clearly necessary or else the event could never be set).
2. The event is in a signaled state if and only if the count is
greater than zero. To assure this property, the count should
be checked after every semaphore decrease.
3. The semaphore count is always >= 0 and <= the maximum count
*/
{
SYNCHHANDLE hSynch = NULL, hShare = NULL;
TCHAR MutexName [MAX_PATH] = _T(""), EventName [MAX_PATH] = _T(""),
MutexaName[MAX_PATH] = _T("");
BOOL NewObject;
if (lInitialCount > lMaximumCount || lMaximumCount < 0 || lInitialCount < 0) {
/* Bad parameters */
SetLastError (SYNCH_ERROR);
return NULL;
}
hSynch = AllocSynchHandle (lpName, MutexName, EventName, MutexaName, &NewObject);
if (hSynch == NULL) {
SetLastError (SYNCH_ERROR);
return NULL;
}
/* Create the object handles. These are always created in the process's
local handle. */
hSynch->hMutex = CreateMutex (lpSemaphoreAttributes, FALSE, (LPCTSTR)MutexName);
/* Create the event. It is initially signaled if and only if the
initial count is > 0 */
hSynch->hEvent = CreateEvent (lpSemaphoreAttributes, FALSE /* autoreset */,
lInitialCount > 0, (LPCTSTR)EventName);
hSynch->hMutexa = NULL;
hSynch->dwFlags = 6; /* An event and a mutex, but no secondary mutex
unless it is a First Come, First Served Multiple Wait semaphore */
if (fFirstComeFirstServed) {
hSynch->hMutexa = CreateMutex (lpSemaphoreAttributes, FALSE, (LPCTSTR)MutexaName);
hSynch->dwFlags = 7; /* All three objects were created */
}
/* Set the object state, always in the local handle (for quick reference)
and in the shared handle if there is one. */
hSynch->MaxCount = lMaximumCount;
hSynch->CurCount = lInitialCount; /* The local value is not maintained. */
hSynch->fFirstComeFirstServed = fFirstComeFirstServed;
_tcscpy (hSynch->lpName, lpName);
hShare = hSynch->SharedHandle;
if (NewObject && hShare != NULL ) {
/* There is a new shared handle. Set the state if it is new */
hShare->MaxCount = lMaximumCount;
hShare->CurCount = lInitialCount; /* The local value is not maintained. */
hShare->fFirstComeFirstServed = fFirstComeFirstServed;
_tcscpy (hShare->lpName, lpName);
}
/* Return with the handle, or, if there was any error, return
a null after closing any open handles and freeing any allocated memory */
return CleanUp (hSynch);
}
BOOL ReleaseSemaphoreMW (SYNCHHANDLE hSemMW, LONG cReleaseCount, LPLONG lpPreviousCount)
/* Multiple wait equivalent to ReleaseSemaphore */
{
BOOL Result = TRUE;
SYNCHHANDLE hState;
/* Gain access to the object to assure that the release count
would not cause the total count to exceed the maximum */
/* The state is maintained locally if the object is unnamed and
in shared memory for a named object */
hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
_try {
WaitForSingleObject (hSemMW->hMutex, INFINITE);
*lpPreviousCount = hState->CurCount;
if (hState->CurCount + cReleaseCount > hState->MaxCount || cReleaseCount <= 0) {
SetLastError (SYNCH_ERROR);
Result = FALSE;
_leave;
}
hState->CurCount += cReleaseCount;
/* Set the autoreset event. All threads currently waiting on the
event will be released, and then the event will be reset. */
SetEvent (hSemMW->hEvent);
}
_finally {
ReleaseMutex (hSemMW->hMutex);
return Result;
}
}
DWORD WaitForSemaphoreMW (SYNCHHANDLE hSemMW, LONG cSemRequest, DWORD dwMilliseconds)
/* Multiple wait semaphore equivalent of WaitForSingleObject.
The second parameter is the number of units requested, and the waiting will be
either first come, first served or first available, depending on the option
selected at create time. */
{
DWORD WaitResult;
SYNCHHANDLE hState;
/* The state is maintained locally if the object is unnamed and
in shared memory for a named object */
hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
if (cSemRequest <= 0 || cSemRequest > hState->MaxCount) {
SetLastError (SYNCH_ERROR);
return WAIT_FAILED;
}
/* If this is a First Come, First Served MW semaphore, then this thread
seizes the secondary mutex to block all other threads.
Do this BEFORE waiting on the mutex protecting the semaphore state. */
if (hSemMW->fFirstComeFirstServed) {
WaitResult = WaitForSingleObject (hSemMW->hMutexa, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
}
WaitResult = WaitForSingleObject (hSemMW->hMutex, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
while (hState->CurCount < cSemRequest) {
/* The count is less than the number requested.
The thread must wait on the event (which, by the rules, is currently reset)
for semaphore resources to become available. First, of course, the mutex
must be released so that another thread will be capable of setting the event.
*/
ReleaseMutex (hSemMW->hMutex);
/* Wait for the event, indicating that some other thread has increased
the semaphore count.
The event is autoreset and signaled with a SetEvent (not PulseEvent)
so only those threads currently waiting will be released.
*/
WaitResult = WaitForSingleObject (hSemMW->hEvent, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0) return WaitResult;
/* Seize the semaphore so that the semaphore state can be retested
at the top of the loop. Note that there may be other threads
waiting at this same point, but only one at a time can test the
semaphore count. */
WaitResult = WaitForSingleObject (hSemMW->hMutex, dwMilliseconds);
if (WaitResult != WAIT_OBJECT_0 && WaitResult != WAIT_ABANDONED_0) return WaitResult;
}
/* hState->CurCount < cSemRequest (the request can be satisfied), and
this thread owns the semaphore */
hState->CurCount -= cSemRequest;
if (hState->CurCount) SetEvent (hSemMW->hEvent);
ReleaseMutex (hSemMW->hMutex);
if (hSemMW->fFirstComeFirstServed) ReleaseMutex (hSemMW->hMutexa);
return WaitResult;
}
BOOL CloseSynchHandle (SYNCHHANDLE hSynch)
/* Close a synchronization handle.
Improvement: Test for a valid handle before closing the handle */
{
BOOL Result = TRUE;
if (hSynch->hEvent != NULL) Result = Result && CloseHandle (hSynch->hEvent);
if (hSynch->hMutex != NULL) Result = Result && CloseHandle (hSynch->hMutex);
if (hSynch->hMutexa != NULL) Result = Result && CloseHandle (hSynch->hMutexa);
if (hSynch->SharedHandle != NULL)
InterlockedDecrement (&(hSynch->SharedHandle->RefCount));
if (hSynch->ViewOfFile != NULL) UnmapViewOfFile (hSynch->ViewOfFile);
HeapFree (GetProcessHeap (), 0, hSynch);
return (Result);
}
static SYNCHHANDLE CleanUp (SYNCHHANDLE hSynch)
{ /* Prepare to return from a create of a synchronization handle.
If there was any failure, free any allocated resources
"Flags" indicates which Win32 objects are required in the
synchronization handle */
BOOL ok = TRUE;
DWORD Flags;
if (hSynch == NULL) return NULL;
Flags = hSynch->dwFlags;
if (Flags & 4 == 1 && hSynch->hEvent == NULL) ok = FALSE;
if (Flags & 2 == 1 && hSynch->hMutex == NULL) ok = FALSE;
if (Flags & 1 == 1 && hSynch->hMutexa == NULL) ok = FALSE;
if (!ok) {
CloseSynchHandle (hSynch);
return NULL;
}
/* Everything worked */
return hSynch;
}
static SYNCHHANDLE AllocSynchHandle (LPCTSTR lpName,
LPTSTR MutexName, LPTSTR EventName, LPTSTR MutexaName,
LPBOOL pfNewObject)
/* Allocate memory for a synchronization handle. Unnamed objects
have their handles created directly in the process heap, whereas
named objects have two handles, one allocated locally to
contain the handles (which are local to this process) and
out of a shared memory pool mapped to the paging file for
the shared object state.
Also, create the names for the three internal objects
and determine whether this is a new object that should be initialized. */
{
HANDLE hSynchDB; /* Mutex to protect the entire synchronization object database */
HANDLE hMap, hFile = (HANDLE)0xFFFFFFFF;
/* Shared memory and file handle for maintaining synch objects */
BOOL FirstTime;
SYNCHHANDLE pView, pNew = NULL, pFirstFree, hLocal;
hLocal = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY, SYNCH_HANDLE_SIZE);
*pfNewObject = TRUE;
if (hLocal == NULL || lpName == NULL
|| _tcscmp (lpName, _T("")) == 0) /* The object is not named */
return hLocal;
/* The object is named. Create names for the internal objects. */
_stprintf (MutexName, _T("%s%s"), lpName, _T(".mtx"));
_stprintf (EventName, _T("%s%s"), lpName, _T(".evt"));
_stprintf (MutexaName, _T("%s%s"), lpName, _T(".mtxa"));
/* Lock access to the synchronization object data base to prevent other threads
from concurrently creating another object of the same name.
All processes and threads use this same well-known mutex name. */
hSynchDB = CreateMutex (NULL, FALSE, SYNCH_OBJECT_MUTEX);
WaitForSingleObject (hSynchDB, INFINITE);
/* Access the shared memory where the synchronization objects are maintained.
It is necessary, however, first to check if this is the first time
that an object has been created so that the shared memory-mapped
table can be initialized.
The test is achieved with an OpenFileMapping call. */
_try {
hMap = OpenFileMapping (FILE_MAP_WRITE, FALSE, SYNCH_FM_NAME);
FirstTime = (hMap == NULL);
if (FirstTime)
hMap = CreateFileMapping (hFile, NULL, PAGE_READWRITE, 0, SIZE_SYNCH_DB, SYNCH_FM_NAME);
if (hMap == NULL) _leave;
pView = (SYNCHHANDLE)MapViewOfFile (hMap, FILE_MAP_WRITE, 0, 0, SIZE_SYNCH_DB);
if (pView == NULL) _leave;
if (FirstTime) memset (pView, 0, SIZE_SYNCH_DB);
/* Search to see if an object of this name already exists.
The entry the mapped record is used for bookkeeping, in
case it is ever needed in the future. An empty slot
is detected by a 0 reference count. */
pFirstFree = NULL;
for (pNew = pView+1; pNew < pView + SYNCH_MAX_NUMBER; pNew++) {
if ((pFirstFree == NULL) && (pNew->RefCount <= 0)) pFirstFree = pNew;
if ((pNew->lpName != NULL)
&& _tcscmp (pNew->lpName, lpName) == 0) break; /* Name exists */
}
if (pNew < pView + SYNCH_MAX_NUMBER) { /* The name exists */
*pfNewObject = FALSE;
} else if (pFirstFree != NULL) {
/* The name does not exist, but we have found and empty slot. */
*pfNewObject = TRUE;
pNew = pFirstFree;
} else { /* The name does not exist, but there is no free slot. */
pNew = NULL;
*pfNewObject = TRUE;
}
}
_finally {
if (pNew != NULL) hLocal->ViewOfFile = pView;
hLocal->SharedHandle = pNew;
if (hLocal->SharedHandle != NULL)
InterlockedIncrement (&(hLocal->SharedHandle->RefCount));
ReleaseMutex (hSynchDB);
return hLocal;
}
}
This is the header file. /* synchize.h - header file to go with Synchize.c */
typedef struct _SYNCH_HANDLE_STRUCTURE {
HANDLE hEvent;
HANDLE hMutex;
HANDLE hMutexa;
LONG MaxCount;
volatile LONG CurCount;
LONG RefCount; /* Number of references to a shared handle */
BOOL fFirstComeFirstServed; /* For multiple wait semaphores */
DWORD dwFlags; /* Used as required */
LPVOID ViewOfFile; /* For named objects, this is the mapped file
view containing the handle */
struct _SYNCH_HANDLE_STRUCTURE *SharedHandle; /* For named objects, this is the
shared part of the handle, containing
the handle state */
TCHAR lpName[MAX_PATH];
} SYNCH_HANDLE_STRUCTURE, *SYNCHHANDLE;
#define SYNCH_HANDLE_SIZE sizeof (SYNCH_HANDLE_STRUCTURE)
#define SYNCH_MAX_NUMBER 1000
#define SIZE_SYNCH_DB SYNCH_MAX_NUMBER*SYNCH_HANDLE_SIZE
#define SYNCH_OBJECT_MUTEX _T("SynchObjectMutex")
#define SYNCH_FM_NAME _T("SynchSharedMem")
/* Error codes - all must have bit 29 set */
#define SYNCH_ERROR 0X20000000 /* EXERCISE - REFINE THE ERROR NUMBERS */
SYNCHHANDLE CreateSemaphoreCE (LPSECURITY_ATTRIBUTES, LONG, LONG, LPCTSTR);
BOOL ReleaseSemaphoreCE (SYNCHHANDLE, LONG, LPLONG);
DWORD WaitForSemaphoreCE (SYNCHHANDLE, DWORD);
BOOL CloseSynchHandle (SYNCHHANDLE);
SYNCHHANDLE CreateSemaphoreMW (LPSECURITY_ATTRIBUTES, LONG, LONG,BOOL, LPCTSTR);
BOOL ReleaseSemaphoreMW (SYNCHHANDLE, LONG, LPLONG);
DWORD WaitForSemaphoreMW (SYNCHHANDLE, LONG, DWORD);
This is the test program /* Test the Multiple Wait Semaphore synchronization compound object */
#include "EvryThng.h"
#include "Synchize.h"
static DWORD WINAPI ProducerTh (LPVOID);
static DWORD WINAPI ConsumerTh (LPVOID);
static DWORD WINAPI MonitorTh (LPVOID);
static BOOL WINAPI CtrlcHandler (DWORD);
static SYNCHHANDLE hSemMW;
static LONG SemMax, SemInitial, NumProducers, NumConsumers, TotalExcess = 0;
static HANDLE hTestMutex;
static volatile BOOL Debug = FALSE;
static volatile BOOL Exit = FALSE;
static volatile DWORD NumSuccess = 0, NumFailures = 0, NumProduced = 0,
FailureCount = 0, NumConsumed = 0;
static LPDWORD ProducerCount, ConsumerCount;
int _tmain (int argc, LPTSTR argv[])
/* Test CE semaphores; that is, semaphores created out of a mutex, a
counter, and an autoreset event */
{
LPHANDLE Producer, Consumer;
LONG iThread;
DWORD ThreadId;
HANDLE hMonitor;
BOOL FirstCFS; /* First come, first served, or first available? */
TCHAR Name [MAX_PATH] = _T("");
FILETIME FileTime; /* Times to seed random #s. */
SYSTEMTIME SysTi;
/* Randomly initialize the random number seed */
GetSystemTime (&SysTi);
SystemTimeToFileTime (&SysTi, &FileTime);
srand (FileTime.dwLowDateTime);
if (!SetConsoleCtrlHandler (CtrlcHandler, TRUE))
ReportError (_T("Failed to set console control handler"), 1, TRUE);
_tprintf (_T("\nEnter SemMax, SemInitial, NumConsumers:, NumProducers:, FCFS?, Debug: "));
_tscanf (_T("%d %d %d %d %d %d"), &SemMax, &SemInitial, &NumConsumers,
&NumProducers, &FirstCFS, &Debug);
_tprintf (_T("\nEnter Name - NULL for none: "));
_tscanf (_T("%s"), Name);
if (_tcscmp (Name, _T("NULL")) == 0) _tcscpy (Name, _T(""));
/* if (Debug) */ _tprintf (_T("You entered: %d %d %d %d %d %s\n"),
SemMax, SemInitial, NumConsumers, NumProducers, FirstCFS, Name);
/* Create a mutex to synchronize various aspects of the test, such as
updating statistics. A CS won't work as we need a WaitForMultipleObjects
involving this mutex in the monitor thread. */
hTestMutex = CreateMutex (NULL, FALSE, NULL);
if (hTestMutex == NULL) ReportError (_T("Could not create test mutex"), 2, TRUE);
NumProduced = SemInitial; /* Initialize the usage statistics for the semaphore. */
hSemMW = CreateSemaphoreMW (NULL, SemInitial, SemMax, FirstCFS, Name);
if (hSemMW == NULL) {
_tprintf (_T("LastError: %x\n"), GetLastError());
ReportError (_T("Failed to create MW semaphore"), 3, TRUE);
}
if (Debug) _tprintf ("MW Semaphore created successfully\n");
/* Create all the semaphore consuming and releasing threads */
/* Create arrays to hold the thread handles, and also
create integer arrays to count number of iterations by each thread. By
observing these counts, we can be sure that no deadlocks have occurred. */
if (NumConsumers > 0) {
Consumer = malloc (NumConsumers*sizeof(HANDLE));
if (Consumer == NULL)
ReportError (_T("Cannot allocate Consumer handles"), 5, FALSE);
ConsumerCount = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY,
NumConsumers*sizeof(DWORD));
if (ConsumerCount == NULL)
ReportError (_T("Cannot allocate Consumer handles"), 5, FALSE);
}
if (NumProducers > 0) {
Producer = malloc (NumProducers*sizeof(HANDLE));
if (Producer == NULL)
ReportError (_T("Cannot allocate Producer handles"), 4, FALSE);
ProducerCount = HeapAlloc (GetProcessHeap(), HEAP_ZERO_MEMORY,
NumProducers*sizeof(DWORD));
if (ProducerCount == NULL)
ReportError (_T("Cannot allocate Producer handles"), 4, FALSE);
}
hMonitor = (HANDLE)_beginthreadex (NULL, 0, MonitorTh, (LPVOID)5000,
CREATE_SUSPENDED, &ThreadId);
if (hMonitor == NULL)
ReportError (_T("Cannot create monitor thread"), 6, TRUE);
SetThreadPriority (hMonitor, THREAD_PRIORITY_HIGHEST);
for (iThread = 0; iThread < NumConsumers; iThread++) {
Consumer [iThread] = (HANDLE)_beginthreadex (NULL, 0, ConsumerTh,
(LPVOID)iThread, CREATE_SUSPENDED, &ThreadId);
if (Consumer[iThread] == NULL)
ReportError (_T("Cannot create consumer thread"), 3, TRUE);
}
for (iThread = 0; iThread < NumProducers; iThread++) {
Producer [iThread] = (HANDLE)_beginthreadex (NULL, 0, ProducerTh,
(LPVOID)iThread, CREATE_SUSPENDED, &ThreadId);
if (Producer[iThread] == NULL)
ReportError (_T("Cannot create producer thread"), 3, TRUE);
}
WaitForSingleObject (hTestMutex, INFINITE);
_tprintf (_T("All threads created successfully.\n"));
ReleaseMutex (hTestMutex);
for (iThread = 0; iThread < NumConsumers; iThread++)
ResumeThread (Consumer [iThread]);
for (iThread = 0; iThread < NumProducers; iThread++)
ResumeThread (Producer [iThread]);
ResumeThread (hMonitor);
if (NumConsumers > 0)
WaitForMultipleObjects (NumConsumers, Consumer, TRUE, INFINITE);
if (NumProducers > 0)
WaitForMultipleObjects (NumProducers, Producer, TRUE, INFINITE);
WaitForSingleObject (hMonitor, INFINITE);
for (iThread = 0; iThread < NumConsumers; iThread++)
CloseHandle (Consumer [iThread]);
for (iThread = 0; iThread < NumProducers; iThread++)
CloseHandle (Producer [iThread]);
HeapFree (GetProcessHeap(), 0, Producer);
HeapFree (GetProcessHeap(), 0, Consumer);
HeapFree (GetProcessHeap(), 0, ProducerCount);
HeapFree (GetProcessHeap(), 0, ConsumerCount);
CloseHandle (hMonitor);
if (!CloseSynchHandle (hSemMW))
ReportError (_T("Failed closing synch handle"), 0, TRUE);
_tprintf (_T("All threads terminated\n"));
return 0;
}
static DWORD WINAPI ProducerTh (LPVOID ThNumber)
/* Producer thread:
1. Compute for a random period of time.
2. Produce a random number of units, uniformly distributed [1, SemMax]
The production will return with no units produced if it would cause the
semaphore count to exceed the maximum (this is consistent with Win32
semaphore semantics).
3. Produce twice as quickly as consuming, as many productions will fail because
the number of units is too large.
*/
{
DWORD Id = (DWORD)ThNumber, Delay, i, k;
LONG PrevCount = 0, RelCount = 0;
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Starting producer number %d.\n"), Id);
ReleaseMutex (hTestMutex);
while (!Exit) {
/* Delay without necessarily giving up processor control.
Notice how the production rate is the same as the consumption
rate so as to keep things balanced. */
Delay = rand() / 2;
for (i = 0; i < Delay; i++) k = rand()*rand() / 2; /* Waste time */
if (rand() % 3 == 0)
Sleep (rand()/(RAND_MAX/500));
/* Give up the processor 1/3 of the time
just to make the thread interaction more interesting */
RelCount = (long)(((float)rand()/RAND_MAX) * SemMax)+1;
if (!ReleaseSemaphoreMW (hSemMW, RelCount, &PrevCount)) {
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug)
_tprintf (_T("Producer #: %d Failed. PrevCount = %d RelCount = %d\n"),
Id, PrevCount, RelCount);
NumFailures++; /* Maintain producer statistics */
FailureCount += RelCount;
ReleaseMutex (hTestMutex);
} else {
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug)
_tprintf (_T("Producer #: %d Succeeded. PrevCount = %d, RelCount = %d\n"),
Id, PrevCount, RelCount);
NumSuccess++;
NumProduced += RelCount;
ReleaseMutex (hTestMutex);
}
ProducerCount[Id]++; /* Number of producer iterations. */
}
_endthreadex (0);
return 0;
}
static DWORD WINAPI ConsumerTh (LPVOID ThNumber)
{
DWORD Id = (DWORD)ThNumber, Delay, i, k;
LONG SeizeCount;
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Starting consumer number %d.\n"), Id);
ReleaseMutex (hTestMutex);
while (!Exit) {
/* Delay without necessarily giving up processor control */
Delay = rand();
for (i = 0; i < Delay; i++) k = rand()*rand(); /* Waste time */;
if (rand() % 3 == 0) Sleep (rand()/(RAND_MAX/1000));
/* Give up the processor 1/3 of the time
just to make the thread interaction more interesting */
/* Random request size */
SeizeCount = (long)(((float)rand()/RAND_MAX) * SemMax)+1;
if (WaitForSemaphoreMW (hSemMW, SeizeCount, rand()) != WAIT_OBJECT_0) {
if (Debug) {
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Consumer #: %d Timed out waiting for %d units\n"),
Id, SeizeCount);
ReleaseMutex (hTestMutex);
}
} else { /* The semaphore unit was obtained successfully - update statistics */
WaitForSingleObject (hTestMutex, INFINITE);
if (Debug) _tprintf (_T("Consumer #: %d Obtained %d units\n"), Id, SeizeCount);
NumConsumed += SeizeCount;
ReleaseMutex (hTestMutex);
}
ConsumerCount[Id]++; /* Number of consumer iterations. */
}
_endthreadex (0);
return 0;
}
static DWORD WINAPI MonitorTh (LPVOID Delay)
/* Monitor thread - periodically check the statistics for consistency */
{
LONG CurCount = 0, Max = 0, ExcessCount, i;
SYNCHHANDLE hState;
HANDLE hBoth[2] = {hTestMutex, hSemMW->hMutex};
/* This is cheating to reach inside the opaque handle, but we need to get
simultaneous access to both the semaphore state and to the statistics
in order to test consistency. Some consistency failures are still possible,
however, as the statistics are updated independently of changing the semaphore
state, so the semaphore might be changed by another thread before the producer
or consumer can update the global statistics, Thus, there can be no absolute
consistency between the semaphore state and the global statistics maintained
by this test program, and, to make them consistent would require putting a
critical section around the producer/consumer loop bodies, which would destroy
the asynchronous operation required for testing.
But, the general state of the semaphore can still be checked, and we can
also be sure that it has not been corrupted. */
/* Get the handle to the semaphore state */
hState = (hSemMW->SharedHandle == NULL) ? hSemMW : hSemMW->SharedHandle;
while (!Exit) {
WaitForMultipleObjects (2, hBoth, TRUE, INFINITE);
CurCount = hState->CurCount;
Max = hState->MaxCount;
ExcessCount = NumProduced - (NumConsumed + CurCount);
/* Excess of production over consumption
which should average out to 0 */
_tprintf (_T
("Monitor statistics:\nProduced: %d\nConsumed: %d\nCurrent: %d\nMaximum: %d\n"),
NumProduced, NumConsumed, CurCount, Max);
/* For Correctness, we must have: Produced == Consumed + CurCount */
if (ExcessCount != 0 || CurCount < 0 || CurCount > SemMax || SemMax != Max) {
_tprintf (_T("Discrepancy: %d\n"), ExcessCount);
TotalExcess += ExcessCount;
}
else _tprintf (_T("Consistency test passed. Total excess count: %d\n"), TotalExcess);
_tprintf (_T("Successful release calls: %d\nFailed release calls: %d\n"),
NumSuccess, NumFailures);
_tprintf (_T("Units not released: %d\n"), FailureCount);
_tprintf (_T("Number of Consumer iterations, by thread\n"));
for (i = 0; i < NumConsumers; i++) _tprintf (_T("%6d"), ConsumerCount[i]);
_tprintf (_T("\nNumber of Producer iterations, by thread\n"));
for (i = 0; i < NumProducers; i++) _tprintf (_T("%6d"), ProducerCount[i]);
_tprintf (_T("\n**************\n"));
ReleaseMutex (hTestMutex);
ReleaseMutex (hSemMW->hMutex);
Sleep ((DWORD)Delay);
}
_endthreadex(0);
return 0;
}
static BOOL WINAPI CtrlcHandler (DWORD CtrlEvent)
{
DWORD PrevCount;
LONG i;
if (CtrlEvent == CTRL_C_EVENT) {
WaitForSingleObject (hTestMutex, INFINITE);
_tprintf (_T("Control-c received. Shutting down.\n"));
ReleaseMutex (hTestMutex);
Exit = TRUE;
/* Assure that all Consumer threads are not blocked waiting on
a semaphore so that they have a chance to shut down. */
for (i = 0; i < NumConsumers; i++) {
ReleaseSemaphoreMW (hSemMW, 1, &PrevCount);
/* Release the CPU so that the consumer can be scheduled */
Sleep (0);
}
return TRUE;
}
else return FALSE;
}
|