The previous version of the completion channel was racy and would occasionally lose events, resulting in users blocking indefinitely if no new events occurred. The most sure fix for this is to add a thread to the completion manager that reaps events from an IO completion port and dispatches them to the correct completion channel. This results in a 1-2% performance hit in libibverbs bandwidth tests that wait on CQ, but actually works.
Signed-off-by: Sean Hefty <[email protected]> --- As a reminder, the completion manager / channel abstraction is used to simulate the select/poll functionality across multiple FDs on linux. diff -up -r -X \mshefty\scm\winof\trunk\docs\dontdiff.txt -I '\$Id:' trunk\etc/user/comp_channel.cpp branches\winverbs\etc/user/comp_channel.cpp --- trunk\etc/user/comp_channel.cpp 2009-03-10 02:11:36.546875000 -0700 +++ branches\winverbs\etc/user/comp_channel.cpp 2009-04-10 11:57:38.534233100 -0700 @@ -28,29 +28,88 @@ */ #include <comp_channel.h> +#include <process.h> +static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry); static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry); + +/* + * Completion manager + */ + +static unsigned __stdcall CompThreadPoll(void *Context) +{ + COMP_MANAGER *mgr = (COMP_MANAGER *) Context; + COMP_ENTRY *entry; + OVERLAPPED *overlap; + DWORD bytes; + ULONG_PTR key; + + while (mgr->Run) { + GetQueuedCompletionStatus(mgr->CompQueue, &bytes, &key, + &overlap, INFINITE); + entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap); + + if (entry->Channel) { + CompChannelQueue(entry->Channel, entry); + } else { + CompManagerQueue(mgr, entry); + } + } + + _endthreadex(0); + return 0; +} + DWORD CompManagerOpen(COMP_MANAGER *pMgr) { + DWORD ret; + + InitializeCriticalSection(&pMgr->Lock); + pMgr->Busy = 0; + DListInit(&pMgr->DoneList); + CompEntryInit(NULL, &pMgr->Entry); + pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1); if (pMgr->CompQueue == NULL) { - return GetLastError(); + ret = GetLastError(); + goto err1; } pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL); if (pMgr->Event == NULL) { - return GetLastError(); + ret = GetLastError(); + goto err2; } - pMgr->Lock = 0; + pMgr->Run = TRUE; + pMgr->Thread = (HANDLE) _beginthreadex(NULL, 0, CompThreadPoll, pMgr, 0, NULL); + if (pMgr->Thread == NULL) { + ret = GetLastError(); + goto err3; + } return 0; + +err3: + CloseHandle(pMgr->Event); +err2: + CloseHandle(pMgr->CompQueue); +err1: + DeleteCriticalSection(&pMgr->Lock); + return ret; } void CompManagerClose(COMP_MANAGER *pMgr) { + pMgr->Run = FALSE; + CompManagerCancel(pMgr); + WaitForSingleObject(pMgr->Thread, INFINITE); + CloseHandle(pMgr->Thread); + CloseHandle(pMgr->CompQueue); CloseHandle(pMgr->Event); + DeleteCriticalSection(&pMgr->Lock); } DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key) @@ -61,38 +120,85 @@ DWORD CompManagerMonitor(COMP_MANAGER *p return (cq == NULL) ? GetLastError() : 0; } +static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry) +{ + EnterCriticalSection(&pMgr->Lock); + DListInsertTail(&pEntry->MgrEntry, &pMgr->DoneList); + SetEvent(pMgr->Event); + LeaveCriticalSection(&pMgr->Lock); +} + +static void CompManagerRemoveEntry(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry) +{ + EnterCriticalSection(&pMgr->Lock); + DListRemove(&pEntry->MgrEntry); + LeaveCriticalSection(&pMgr->Lock); +} + DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds, COMP_CHANNEL **ppChannel) { COMP_ENTRY *entry; - OVERLAPPED *overlap; - DWORD bytes, ret; - ULONG_PTR key; + DWORD ret = 0; - if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap, - Milliseconds)) { - entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap); - *ppChannel = entry->Channel; - CompChannelQueue(entry->Channel, entry); - ret = 0; - } else { - ret = GetLastError(); + EnterCriticalSection(&pMgr->Lock); + while (DListEmpty(&pMgr->DoneList)) { + ResetEvent(pMgr->Event); + LeaveCriticalSection(&pMgr->Lock); + + ret = WaitForSingleObject(pMgr->Event, Milliseconds); + if (ret) { + return ret; + } + + EnterCriticalSection(&pMgr->Lock); } + + entry = CONTAINING_RECORD(pMgr->DoneList.Next, COMP_ENTRY, MgrEntry); + *ppChannel = entry->Channel; + if (entry->Channel == NULL) { + DListRemove(&entry->MgrEntry); + InterlockedExchange(&entry->Busy, 0); + ret = ERROR_CANCELLED; + } + LeaveCriticalSection(&pMgr->Lock); + return ret; } +void CompManagerCancel(COMP_MANAGER *pMgr) +{ + if (InterlockedCompareExchange(&pMgr->Entry.Busy, 1, 0) == 0) { + PostQueuedCompletionStatus(pMgr->CompQueue, 0, (ULONG_PTR) pMgr, + &pMgr->Entry.Overlap); + } +} + -void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds) +/* + * Completion channel + */ + +DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds) { pChannel->Manager = pMgr; pChannel->Head = NULL; pChannel->TailPtr = &pChannel->Head; - InitializeCriticalSection(&pChannel->Lock); pChannel->Milliseconds = Milliseconds; + + pChannel->Event = CreateEvent(NULL, TRUE, TRUE, NULL); + if (pChannel->Event == NULL) { + return GetLastError(); + } + + InitializeCriticalSection(&pChannel->Lock); + CompEntryInit(pChannel, &pChannel->Entry); + return 0; } void CompChannelCleanup(COMP_CHANNEL *pChannel) { + CloseHandle(pChannel->Event); DeleteCriticalSection(&pChannel->Lock); } @@ -114,84 +220,101 @@ static COMP_ENTRY *CompChannelRemoveHead return entry; } +static COMP_ENTRY *CompChannelFindRemove(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry) +{ + COMP_ENTRY **entry_ptr, *entry; + + EnterCriticalSection(&pChannel->Lock); + entry_ptr = &pChannel->Head; + while (*entry_ptr && *entry_ptr != pEntry) { + entry_ptr = &(*entry_ptr)->Next; + } + + if (*entry_ptr != NULL) { + *entry_ptr = pEntry->Next; + if (pChannel->TailPtr == &pEntry->Next) { + pChannel->TailPtr = entry_ptr; + } + CompManagerRemoveEntry(pChannel->Manager, pEntry); + InterlockedExchange(&pEntry->Busy, 0); + } + LeaveCriticalSection(&pChannel->Lock); + return *entry_ptr; +} + static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry) { pEntry->Next = NULL; EnterCriticalSection(&pChannel->Lock); + CompManagerQueue(pChannel->Manager, pEntry); CompChannelInsertTail(pChannel, pEntry); + SetEvent(pChannel->Event); LeaveCriticalSection(&pChannel->Lock); } DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry) { - COMP_MANAGER *mgr = pChannel->Manager; - COMP_CHANNEL *chan; - DWORD ret = 0; - ULONG locked; + COMP_ENTRY *entry; + DWORD ret; EnterCriticalSection(&pChannel->Lock); while (pChannel->Head == NULL) { + ResetEvent(pChannel->Event); LeaveCriticalSection(&pChannel->Lock); - locked = InterlockedCompareExchange(&mgr->Lock, 1, 0); - if (locked == 0) { - ResetEvent(mgr->Event); - ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan); - InterlockedExchange(&mgr->Lock, 0); - SetEvent(mgr->Event); - } else { - ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds); - } + ret = WaitForSingleObject(pChannel->Event, pChannel->Milliseconds); if (ret) { - goto out; + return ret; } EnterCriticalSection(&pChannel->Lock); } - *ppEntry = CompChannelRemoveHead(pChannel); + entry = CompChannelRemoveHead(pChannel); + CompManagerRemoveEntry(pChannel->Manager, entry); LeaveCriticalSection(&pChannel->Lock); -out: + InterlockedExchange(&entry->Busy, 0); + *ppEntry = entry; + ret = (entry == &pChannel->Entry) ? ERROR_CANCELLED : 0; + return ret; } -void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry) +void CompChannelCancel(COMP_CHANNEL *pChannel) { - COMP_CHANNEL *chan; - COMP_ENTRY **entry_ptr; - DWORD ret; - - do { - ret = CompManagerPoll(pChannel->Manager, 0, &chan); - } while (!ret); - SetEvent(pChannel->Manager->Event); - - EnterCriticalSection(&pChannel->Lock); - entry_ptr = &pChannel->Head; - while (*entry_ptr && *entry_ptr != pEntry) { - entry_ptr = &(*entry_ptr)->Next; + if (InterlockedCompareExchange(&pChannel->Entry.Busy, 1, 0) == 0) { + PostQueuedCompletionStatus(pChannel->Manager->CompQueue, 0, + (ULONG_PTR) pChannel, &pChannel->Entry.Overlap); } - - if (*entry_ptr != NULL) { - *entry_ptr = pEntry->Next; - if (pChannel->TailPtr == &pEntry->Next) { - pChannel->TailPtr = entry_ptr; - } - } - LeaveCriticalSection(&pChannel->Lock); } + +/* + * Completion entry + */ + void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry) { + RtlZeroMemory(pEntry, sizeof *pEntry); pEntry->Channel = pChannel; } DWORD CompEntryPost(COMP_ENTRY *pEntry) { - if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0, - &pEntry->Overlap)) { - return 0; - } else { - return GetLastError(); + if (InterlockedCompareExchange(&pEntry->Busy, 1, 0) == 0) { + if (!PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, + 0, 0, &pEntry->Overlap)) { + InterlockedExchange(&pEntry->Busy, 0); + return GetLastError(); + } + } + return 0; +} + +void CompEntryCancel(COMP_ENTRY *pEntry) +{ + while (pEntry->Busy) { + Sleep(0); + CompChannelFindRemove(pEntry->Channel, pEntry); } } --- trunk\inc\user\comp_channel.h 2009-03-10 02:09:09.765625000 -0700 +++ branches\winverbs\inc\user\comp_channel.h 2009-04-09 17:43:01.515373300 -0700 @@ -33,6 +33,7 @@ #define COMP_CHANNEL_H #include <windows.h> +#include <dlist.h> #ifdef __cplusplus extern "C" { @@ -41,8 +42,10 @@ extern "C" { typedef struct _COMP_ENTRY { struct _COMP_ENTRY *Next; + DLIST_ENTRY MgrEntry; OVERLAPPED Overlap; struct _COMP_CHANNEL *Channel; + LONG volatile Busy; } COMP_ENTRY; @@ -51,6 +54,8 @@ typedef struct _COMP_CHANNEL struct _COMP_MANAGER *Manager; COMP_ENTRY *Head; COMP_ENTRY **TailPtr; + COMP_ENTRY Entry; + HANDLE Event; CRITICAL_SECTION Lock; DWORD Milliseconds; @@ -59,8 +64,13 @@ typedef struct _COMP_CHANNEL typedef struct _COMP_MANAGER { HANDLE CompQueue; + DLIST_ENTRY DoneList; + COMP_ENTRY Entry; + HANDLE Thread; + BOOL Run; HANDLE Event; - LONG volatile Lock; + LONG volatile Busy; + CRITICAL_SECTION Lock; } COMP_MANAGER; @@ -69,15 +79,17 @@ void CompManagerClose(COMP_MANAGER *pMg DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key); DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds, COMP_CHANNEL **ppChannel); +void CompManagerCancel(COMP_MANAGER *pMgr); -void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, +DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds); void CompChannelCleanup(COMP_CHANNEL *pChannel); DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry); -void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry); +void CompChannelCancel(COMP_CHANNEL *pChannel); void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry); DWORD CompEntryPost(COMP_ENTRY *pEntry); +void CompEntryCancel(COMP_ENTRY *pEntry); #ifdef __cplusplus } _______________________________________________ ofw mailing list [email protected] http://lists.openfabrics.org/cgi-bin/mailman/listinfo/ofw
