http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/selector.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/selector.c b/proton-c/src/windows/selector.c deleted file mode 100644 index f139aec..0000000 --- a/proton-c/src/windows/selector.c +++ /dev/null @@ -1,382 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif -#if _WIN32_WINNT < 0x0501 -#error "Proton requires Windows API support for XP or later." -#endif -#include <winsock2.h> -#include <Ws2tcpip.h> - -#include "platform.h" -#include <proton/object.h> -#include <proton/io.h> -#include <proton/selector.h> -#include <proton/error.h> -#include <assert.h> -#include "selectable.h" -#include "util.h" -#include "iocp.h" - -static void interests_update(iocpdesc_t *iocpd, int interests); -static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t t); - -struct pn_selector_t { - iocp_t *iocp; - pn_list_t *selectables; - pn_list_t *iocp_descriptors; - size_t current; - iocpdesc_t *current_triggered; - pn_timestamp_t awoken; - pn_error_t *error; - iocpdesc_t *triggered_list_head; - iocpdesc_t *triggered_list_tail; - iocpdesc_t *deadlines_head; - iocpdesc_t *deadlines_tail; -}; - -void pn_selector_initialize(void *obj) -{ - pn_selector_t *selector = (pn_selector_t *) obj; - selector->iocp = NULL; - selector->selectables = pn_list(PN_WEAKREF, 0); - selector->iocp_descriptors = pn_list(PN_OBJECT, 0); - selector->current = 0; - selector->current_triggered = NULL; - selector->awoken = 0; - selector->error = pn_error(); - selector->triggered_list_head = NULL; - selector->triggered_list_tail = NULL; - selector->deadlines_head = NULL; - selector->deadlines_tail = NULL; -} - -void pn_selector_finalize(void *obj) -{ - pn_selector_t *selector = (pn_selector_t *) obj; - pn_free(selector->selectables); - pn_free(selector->iocp_descriptors); - pn_error_free(selector->error); - selector->iocp->selector = NULL; -} - -#define pn_selector_hashcode NULL -#define pn_selector_compare NULL -#define pn_selector_inspect NULL - -pn_selector_t *pni_selector() -{ - static const pn_class_t clazz = PN_CLASS(pn_selector); - pn_selector_t *selector = (pn_selector_t *) pn_class_new(&clazz, sizeof(pn_selector_t)); - return selector; -} - -pn_selector_t *pni_selector_create(iocp_t *iocp) -{ - pn_selector_t *selector = pni_selector(); - selector->iocp = iocp; - return selector; -} - -void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable) -{ - assert(selector); - assert(selectable); - assert(pni_selectable_get_index(selectable) < 0); - pn_socket_t sock = pn_selectable_get_fd(selectable); - iocpdesc_t *iocpd = NULL; - - if (pni_selectable_get_index(selectable) < 0) { - pn_list_add(selector->selectables, selectable); - pn_list_add(selector->iocp_descriptors, NULL); - size_t size = pn_list_size(selector->selectables); - pni_selectable_set_index(selectable, size - 1); - } - - pn_selector_update(selector, selectable); -} - -void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable) -{ - // A selectable's fd may switch from PN_INVALID_SCOKET to a working socket between - // update calls. If a selectable without a valid socket has a deadline, we need - // a dummy iocpdesc_t to participate in the deadlines list. - int idx = pni_selectable_get_index(selectable); - assert(idx >= 0); - pn_timestamp_t deadline = pn_selectable_get_deadline(selectable); - pn_socket_t sock = pn_selectable_get_fd(selectable); - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); - - if (!iocpd && deadline && sock == PN_INVALID_SOCKET) { - iocpd = pni_deadline_desc(selector->iocp); - assert(iocpd); - pn_list_set(selector->iocp_descriptors, idx, iocpd); - pn_decref(iocpd); // life is solely tied to iocp_descriptors list - iocpd->selector = selector; - iocpd->selectable = selectable; - } - else if (iocpd && iocpd->deadline_desc && sock != PN_INVALID_SOCKET) { - // Switching to a real socket. Stop using a deadline descriptor. - deadlines_update(iocpd, 0); - // decref descriptor in list and pick up a real iocpd below - pn_list_set(selector->iocp_descriptors, idx, NULL); - iocpd = NULL; - } - - // The selectables socket may be set long after it has been added - if (!iocpd && sock != PN_INVALID_SOCKET) { - iocpd = pni_iocpdesc_map_get(selector->iocp, sock); - if (!iocpd) { - // Socket created outside proton. Hook it up to iocp. - iocpd = pni_iocpdesc_create(selector->iocp, sock, true); - assert(iocpd); - if (iocpd) - pni_iocpdesc_start(iocpd); - } - if (iocpd) { - pn_list_set(selector->iocp_descriptors, idx, iocpd); - iocpd->selector = selector; - iocpd->selectable = selectable; - } - } - - if (iocpd) { - assert(sock == iocpd->socket || iocpd->closing); - int interests = PN_ERROR; // Always - if (pn_selectable_is_reading(selectable)) { - interests |= PN_READABLE; - } - if (pn_selectable_is_writing(selectable)) { - interests |= PN_WRITABLE; - } - if (deadline) { - interests |= PN_EXPIRED; - } - interests_update(iocpd, interests); - deadlines_update(iocpd, deadline); - } -} - -void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable) -{ - assert(selector); - assert(selectable); - - int idx = pni_selectable_get_index(selectable); - assert(idx >= 0); - iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(selector->iocp_descriptors, idx); - if (iocpd) { - if (selector->current_triggered == iocpd) - selector->current_triggered = iocpd->triggered_list_next; - interests_update(iocpd, 0); - deadlines_update(iocpd, 0); - assert(selector->triggered_list_head != iocpd && !iocpd->triggered_list_prev); - assert(selector->deadlines_head != iocpd && !iocpd->deadlines_prev); - iocpd->selector = NULL; - iocpd->selectable = NULL; - } - pn_list_del(selector->selectables, idx, 1); - pn_list_del(selector->iocp_descriptors, idx, 1); - size_t size = pn_list_size(selector->selectables); - for (size_t i = idx; i < size; i++) { - pn_selectable_t *sel = (pn_selectable_t *) pn_list_get(selector->selectables, i); - pni_selectable_set_index(sel, i); - } - - pni_selectable_set_index(selectable, -1); - - if (selector->current >= (size_t) idx) { - selector->current--; - } -} - -size_t pn_selector_size(pn_selector_t *selector) { - assert(selector); - return pn_list_size(selector->selectables); -} - -int pn_selector_select(pn_selector_t *selector, int timeout) -{ - assert(selector); - pn_error_clear(selector->error); - pn_timestamp_t deadline = 0; - pn_timestamp_t now = pn_i_now(); - - if (timeout) { - if (selector->deadlines_head) - deadline = selector->deadlines_head->deadline; - } - if (deadline) { - int64_t delta = deadline - now; - if (delta < 0) { - delta = 0; - } - if (timeout < 0) - timeout = delta; - else if (timeout > delta) - timeout = delta; - } - deadline = (timeout >= 0) ? now + timeout : 0; - - // Process all currently available completions, even if matched events available - pni_iocp_drain_completions(selector->iocp); - pni_zombie_check(selector->iocp, now); - // Loop until an interested event is matched, or until deadline - while (true) { - if (selector->triggered_list_head) - break; - if (deadline && deadline <= now) - break; - pn_timestamp_t completion_deadline = deadline; - pn_timestamp_t zd = pni_zombie_deadline(selector->iocp); - if (zd) - completion_deadline = completion_deadline ? pn_min(zd, completion_deadline) : zd; - - int completion_timeout = (!completion_deadline) ? -1 : completion_deadline - now; - int rv = pni_iocp_wait_one(selector->iocp, completion_timeout, selector->error); - if (rv < 0) - return pn_error_code(selector->error); - - now = pn_i_now(); - if (zd && zd <= now) { - pni_zombie_check(selector->iocp, now); - } - } - - selector->current = 0; - selector->awoken = now; - for (iocpdesc_t *iocpd = selector->deadlines_head; iocpd; iocpd = iocpd->deadlines_next) { - if (iocpd->deadline <= now) - pni_events_update(iocpd, iocpd->events | PN_EXPIRED); - else - break; - } - selector->current_triggered = selector->triggered_list_head; - return pn_error_code(selector->error); -} - -pn_selectable_t *pn_selector_next(pn_selector_t *selector, int *events) -{ - if (selector->current_triggered) { - iocpdesc_t *iocpd = selector->current_triggered; - *events = iocpd->interests & iocpd->events; - selector->current_triggered = iocpd->triggered_list_next; - return iocpd->selectable; - } - return NULL; -} - -void pn_selector_free(pn_selector_t *selector) -{ - assert(selector); - pn_free(selector); -} - - -static void triggered_list_add(pn_selector_t *selector, iocpdesc_t *iocpd) -{ - if (iocpd->triggered_list_prev || selector->triggered_list_head == iocpd) - return; // already in list - LL_ADD(selector, triggered_list, iocpd); -} - -static void triggered_list_remove(pn_selector_t *selector, iocpdesc_t *iocpd) -{ - if (!iocpd->triggered_list_prev && selector->triggered_list_head != iocpd) - return; // not in list - LL_REMOVE(selector, triggered_list, iocpd); - iocpd->triggered_list_prev = NULL; - iocpd->triggered_list_next = NULL; -} - - -void pni_events_update(iocpdesc_t *iocpd, int events) -{ - // If set, a poll error is permanent - if (iocpd->poll_error) - events |= PN_ERROR; - if (iocpd->events == events) - return; - iocpd->events = events; - if (iocpd->selector) { - if (iocpd->events & iocpd->interests) - triggered_list_add(iocpd->selector, iocpd); - else - triggered_list_remove(iocpd->selector, iocpd); - } -} - -static void interests_update(iocpdesc_t *iocpd, int interests) -{ - int old_interests = iocpd->interests; - if (old_interests == interests) - return; - iocpd->interests = interests; - if (iocpd->selector) { - if (iocpd->events & iocpd->interests) - triggered_list_add(iocpd->selector, iocpd); - else - triggered_list_remove(iocpd->selector, iocpd); - } -} - -static void deadlines_remove(pn_selector_t *selector, iocpdesc_t *iocpd) -{ - if (!iocpd->deadlines_prev && selector->deadlines_head != iocpd) - return; // not in list - LL_REMOVE(selector, deadlines, iocpd); - iocpd->deadlines_prev = NULL; - iocpd->deadlines_next = NULL; -} - - -static void deadlines_update(iocpdesc_t *iocpd, pn_timestamp_t deadline) -{ - if (deadline == iocpd->deadline) - return; - - iocpd->deadline = deadline; - pn_selector_t *selector = iocpd->selector; - if (!deadline) { - deadlines_remove(selector, iocpd); - pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); - } else { - if (iocpd->deadlines_prev || selector->deadlines_head == iocpd) { - deadlines_remove(selector, iocpd); - pni_events_update(iocpd, iocpd->events & ~PN_EXPIRED); - } - iocpdesc_t *dl_iocpd = LL_HEAD(selector, deadlines); - while (dl_iocpd && dl_iocpd->deadline <= deadline) - dl_iocpd = dl_iocpd->deadlines_next; - if (dl_iocpd) { - // insert - iocpd->deadlines_prev = dl_iocpd->deadlines_prev; - iocpd->deadlines_next = dl_iocpd; - dl_iocpd->deadlines_prev = iocpd; - if (selector->deadlines_head == dl_iocpd) - selector->deadlines_head = iocpd; - } else { - LL_ADD(selector, deadlines, iocpd); // append - } - } -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/windows/write_pipeline.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/write_pipeline.c b/proton-c/src/windows/write_pipeline.c deleted file mode 100644 index e14e714..0000000 --- a/proton-c/src/windows/write_pipeline.c +++ /dev/null @@ -1,312 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * A simple write buffer pool. Each socket has a dedicated "primary" - * buffer and can borrow from a shared pool with limited size tuning. - * Could enhance e.g. with separate pools per network interface and fancier - * memory tuning based on interface speed, system resources, and - * number of connections, etc. - */ - -#ifndef _WIN32_WINNT -#define _WIN32_WINNT 0x0501 -#endif -#if _WIN32_WINNT < 0x0501 -#error "Proton requires Windows API support for XP or later." -#endif -#include <winsock2.h> -#include <Ws2tcpip.h> - -#include "platform.h" -#include <proton/object.h> -#include <proton/io.h> -#include <proton/selector.h> -#include <proton/error.h> -#include <assert.h> -#include "selectable.h" -#include "util.h" -#include "iocp.h" - -// Max overlapped writes per socket -#define IOCP_MAX_OWRITES 16 -// Write buffer size -#define IOCP_WBUFSIZE 16384 - -static void pipeline_log(const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - va_end(ap); - fflush(stderr); -} - -void pni_shared_pool_create(iocp_t *iocp) -{ - // TODO: more pools (or larger one) when using multiple non-loopback interfaces - iocp->shared_pool_size = 16; - char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging - if (env) { - int sz = atoi(env); - if (sz >= 0 && sz < 256) { - iocp->shared_pool_size = sz; - } - } - iocp->loopback_bufsize = 0; - env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging - if (env) { - int sz = atoi(env); - if (sz >= 0 && sz <= 128 * 1024) { - iocp->loopback_bufsize = sz; - } - } - - if (iocp->shared_pool_size) { - iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE); - HRESULT status = GetLastError(); - if (!iocp->shared_pool_memory) { - perror("Proton write buffer pool allocation failure\n"); - iocp->shared_pool_size = 0; - iocp->shared_available_count = 0; - return; - } - - iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); - iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *)); - iocp->shared_available_count = iocp->shared_pool_size; - char *mem = iocp->shared_pool_memory; - for (int i = 0; i < iocp->shared_pool_size; i++) { - iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE); - mem += IOCP_WBUFSIZE; - } - } -} - -void pni_shared_pool_free(iocp_t *iocp) -{ - for (int i = 0; i < iocp->shared_pool_size; i++) { - write_result_t *result = iocp->shared_results[i]; - if (result->in_use) - pipeline_log("Proton buffer pool leak\n"); - else - free(result); - } - if (iocp->shared_pool_size) { - free(iocp->shared_results); - free(iocp->available_results); - if (iocp->shared_pool_memory) { - if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) { - perror("write buffers release failed"); - } - iocp->shared_pool_memory = NULL; - } - } -} - -static void shared_pool_push(write_result_t *result) -{ - iocp_t *iocp = result->base.iocpd->iocp; - assert(iocp->shared_available_count < iocp->shared_pool_size); - iocp->available_results[iocp->shared_available_count++] = result; -} - -static write_result_t *shared_pool_pop(iocp_t *iocp) -{ - return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL; -} - -struct write_pipeline_t { - iocpdesc_t *iocpd; - size_t pending_count; - write_result_t *primary; - size_t reserved_count; - size_t next_primary_index; - size_t depth; - bool is_writer; -}; - -#define write_pipeline_compare NULL -#define write_pipeline_inspect NULL -#define write_pipeline_hashcode NULL - -static void write_pipeline_initialize(void *object) -{ - write_pipeline_t *pl = (write_pipeline_t *) object; - pl->pending_count = 0; - const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE); - pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE); - pl->depth = 0; - pl->is_writer = false; -} - -static void write_pipeline_finalize(void *object) -{ - write_pipeline_t *pl = (write_pipeline_t *) object; - free((void *)pl->primary->buffer.start); - free(pl->primary); -} - -write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd) -{ - static const pn_cid_t CID_write_pipeline = CID_pn_void; - static const pn_class_t clazz = PN_CLASS(write_pipeline); - write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t)); - pipeline->iocpd = iocpd; - pipeline->primary->base.iocpd = iocpd; - return pipeline; -} - -static void confirm_as_writer(write_pipeline_t *pl) -{ - if (!pl->is_writer) { - iocp_t *iocp = pl->iocpd->iocp; - iocp->writer_count++; - pl->is_writer = true; - } -} - -static void remove_as_writer(write_pipeline_t *pl) -{ - if (!pl->is_writer) - return; - iocp_t *iocp = pl->iocpd->iocp; - assert(iocp->writer_count); - pl->is_writer = false; - iocp->writer_count--; -} - -/* - * Optimal depth will depend on properties of the NIC, server, and driver. For now, - * just distinguish between loopback interfaces and the rest. Optimizations in the - * loopback stack allow decent performance with depth 1 and actually cause major - * performance hiccups if set to large values. - */ -static void set_depth(write_pipeline_t *pl) -{ - pl->depth = 1; - sockaddr_storage sa; - socklen_t salen = sizeof(sa); - char buf[INET6_ADDRSTRLEN]; - DWORD buflen = sizeof(buf); - - if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 && - getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) { - if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) || - (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) { - // not loopback - pl->depth = IOCP_MAX_OWRITES; - } else { - iocp_t *iocp = pl->iocpd->iocp; - if (iocp->loopback_bufsize) { - const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize); - if (p) { - pl->primary->buffer.start = p; - pl->primary->buffer.size = iocp->loopback_bufsize; - } - } - } - } -} - -// Reserve as many buffers as possible for count bytes. -size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count) -{ - if (pl->primary->in_use) - return 0; // I.e. io->wouldblock - if (!pl->depth) - set_depth(pl); - if (pl->depth == 1) { - // always use the primary - pl->reserved_count = 1; - pl->next_primary_index = 0; - return 1; - } - - iocp_t *iocp = pl->iocpd->iocp; - confirm_as_writer(pl); - size_t wanted = (count / IOCP_WBUFSIZE); - if (count % IOCP_WBUFSIZE) - wanted++; - size_t pending = pl->pending_count; - assert(pending < pl->depth); - size_t bufs = pn_min(wanted, pl->depth - pending); - // Can draw from shared pool or the primary... but share with others. - size_t writers = iocp->writer_count; - size_t shared_count = (iocp->shared_available_count + writers - 1) / writers; - bufs = pn_min(bufs, shared_count + 1); - pl->reserved_count = pending + bufs; - - if (bufs == wanted && - pl->reserved_count < (pl->depth / 2) && - iocp->shared_available_count > (2 * writers + bufs)) { - // No shortage: keep the primary as spare for future use - pl->next_primary_index = pl->reserved_count; - } else if (bufs == 1) { - pl->next_primary_index = pending; - } else { - // let approx 1/3 drain before replenishing - pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1; - if (pl->next_primary_index < pending) - pl->next_primary_index = pending; - } - return bufs; -} - -write_result_t *pni_write_pipeline_next(write_pipeline_t *pl) -{ - size_t sz = pl->pending_count; - if (sz >= pl->reserved_count) - return NULL; - write_result_t *result; - if (sz == pl->next_primary_index) { - result = pl->primary; - } else { - assert(pl->iocpd->iocp->shared_available_count > 0); - result = shared_pool_pop(pl->iocpd->iocp); - } - - result->in_use = true; - pl->pending_count++; - return result; -} - -void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result) -{ - result->in_use = false; - pl->pending_count--; - pl->reserved_count = 0; - if (result != pl->primary) - shared_pool_push(result); - if (pl->pending_count == 0) - remove_as_writer(pl); -} - -bool pni_write_pipeline_writable(write_pipeline_t *pl) -{ - // Only writable if not full and we can guarantee a buffer: - return pl->pending_count < pl->depth && !pl->primary->in_use; -} - -size_t pni_write_pipeline_size(write_pipeline_t *pl) -{ - return pl->pending_count; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/tools/cmake/Modules/WindowsC99SymbolCheck.py ---------------------------------------------------------------------- diff --git a/tools/cmake/Modules/WindowsC99SymbolCheck.py b/tools/cmake/Modules/WindowsC99SymbolCheck.py index 8e81ad9..7c2c9f2 100644 --- a/tools/cmake/Modules/WindowsC99SymbolCheck.py +++ b/tools/cmake/Modules/WindowsC99SymbolCheck.py @@ -53,7 +53,7 @@ def symcheck(objfile): m = re.search(r'UNDEF.*\b([a-zA-Z_]*snprintf)\b', line) if m : sym = m.group(1) - if re.match(r'_*pn_i_v?snprintf', sym) is None : + if re.match(r'_*pni_v?snprintf', sym) is None : raise Exception('Unsafe use of C99 violating function in ' + objfile + ' : ' + sym) def main(): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
