Author: fadams Date: Mon Sep 29 17:40:38 2014 New Revision: 1628232 URL: http://svn.apache.org/r1628232 Log: Improve error handling a bit, still something of a work in progress
Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/messenger.js qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/module.js qpid/proton/branches/fadams-javascript-binding/proton-c/include/proton/io.h qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/schannel.c qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html Mon Sep 29 17:40:38 2014 @@ -65,17 +65,20 @@ console.log("body = " + body); messenger.put(message); }; -messenger.on('error', function(error) { +var errorHandler = function(error) { console.log("Received error " + error); -// Error recovery seems to require a new Messenger instance. -messenger.stop(); -messenger.free(); -messenger = new proton.Messenger(); -messenger.start(); -console.log("Restarted"); -}); + // Error recovery seems to require a new Messenger instance. + messenger.stop(); + messenger.free(); + messenger = new proton.Messenger(); + + messenger.on('error', errorHandler); + messenger.start(); + console.log("Restarted"); +}; +messenger.on('error', errorHandler); messenger.start(); </script> Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/messenger.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/messenger.js?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/messenger.js (original) +++ qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/messenger.js Mon Sep 29 17:40:38 2014 @@ -127,34 +127,6 @@ Module['Messenger'] = function(name) { / // This call ensures that the emscripten network callback functions are initialised. Module.EventDispatch.registerMessenger(this); - - - // TODO improve error handling mechanism. - /* - * The emscripten websocket error event could get triggered by any Messenger - * and it's hard to determine which one without knowing which file descriptors - * are associated with which instance. As a workaround we set the _checkErrors - * flag when we call put or subscribe and reset it when work succeeds. - */ - this._checkErrors = false; - - /** - * TODO update to handle multiple Messenger instances - * Handle the emscripten websocket error and use it to trigger a MessengerError - * Note that the emscripten websocket error passes an array containing the - * file descriptor, the errno and the message, we just use the message here. - */ - var that = this; - Module['websocket']['on']('error', function(error) { - -console.log("Module['websocket']['on'] caller is " + arguments.callee.caller.toString()); - -console.log("that._checkErrors = " + that._checkErrors); -console.log("error = " + error); - if (that._checkErrors) { - that._emit('error', new Module['MessengerError'](error[2])); - } - }); }; Module['Messenger'].PN_CUMULATIVE = 0x1; // Protected Class attribute. @@ -176,22 +148,21 @@ var _Messenger_ = Module['Messenger'].pr * @param {number} code the error code to check. */ _Messenger_._check = function(code) { - if (code < 0) { - if (code === Module['Error']['INPROGRESS']) { - return code; - } - + if (code < 0 && code !== Module['Error']['INPROGRESS']) { var errno = this['getErrno'](); var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); - - if (this._callbacks['error']) { - this._emit('error', new Module['MessengerError'](message)); - } else { - throw new Module['MessengerError'](message); + if (message !== 'PN_TIMEOUT') { + if (this._callbacks['error']) { +console.log("emitting " + message); + this._emit('error', new Module['MessengerError'](message)); + } else { +console.log("throwing " + message); + throw new Module['MessengerError'](message); + } } - } else { - return code; } + + return code; }; /** @@ -486,9 +457,10 @@ _Messenger_['subscribe'] = function(sour this._check(Module['Error']['ARG_ERR']); } var sp = Runtime.stackSave(); - this._checkErrors = true; // TODO improve error handling mechanism. + Module.EventDispatch.setCurrentMessenger(this); var subscription = _pn_messenger_subscribe(this._messenger, allocate(intArrayFromString(source), 'i8', ALLOC_STACK)); + Module.EventDispatch.setCurrentMessenger(null); Runtime.stackRestore(sp); if (!subscription) { @@ -532,12 +504,13 @@ _Messenger_['subscribe'] = function(sour _Messenger_['put'] = function(message, flush) { flush = flush === false ? false : true; // Defaults to true if not explicitly specified. message._preEncode(); - this._checkErrors = true; // TODO improve error handling mechanism. + Module.EventDispatch.setCurrentMessenger(this); this._check(_pn_messenger_put(this._messenger, message._message)); + Module.EventDispatch.setCurrentMessenger(null); // If flush is set invoke pn_messenger_work. if (flush) { - _pn_messenger_work(this._messenger, 0); + this._check(_pn_messenger_work(this._messenger, 0)); } // Getting the tracker is a little tricky as it is a 64 bit number. The way @@ -617,16 +590,7 @@ _Messenger_['settle'] = function(tracker * @returns {boolean} true if there is work still to do, false otherwise. */ _Messenger_['work'] = function() { - var err = _pn_messenger_work(this._messenger, 0); - if (err === Module['Error']['TIMEOUT']) { -console.log("work = false"); - return false; - } else { - this._checkErrors = false; // TODO improve error handling mechanism. - this._check(err); -console.log("work = true"); - return true; - } + return (this._check(_pn_messenger_work(this._messenger, 0)) > 0); }; /** Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/module.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/module.js?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/module.js (original) +++ qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/module.js Mon Sep 29 17:40:38 2014 @@ -119,8 +119,17 @@ if (typeof process === 'object' && typeo */ Module.EventDispatch = new function() { // Note the use of new to create a Singleton. var _firstCall = true; // Flag used to check the first time registerMessenger is called. + /** + * We employ a cheat/hack to map file descriptors to the Messenger instance + * that owns them. In put/subscribe we set the current Messenger and then we + * intercept the library socket call with our own, which makes a call to + * the real library socket but also maps the file descriptor to _currentMessenger. + */ + var _currentMessenger = null; var _messengers = {}; + var _fd2Messenger = {}; + /** * Provides functionality roughly equivalent to the following C code: * while (1) { @@ -135,21 +144,30 @@ Module.EventDispatch = new function() { * we bypass the _pn_messenger_work test as it will never succeed after closing. */ var _pump = function(fd, closing) { +//console.log("\t_pump entry " + fd + ", " + closing); for (var i in _messengers) { if (_messengers.hasOwnProperty(i)) { var messenger = _messengers[i]; + //var messenger = _fd2Messenger[fd]; if (closing) { +//console.log("_pump closing"); messenger._emit('work'); } else { - while (_pn_messenger_work(messenger._messenger, 0) >= 0) { +//console.log("_pump while start"); + while (_pn_messenger_work(messenger._messenger, 0) > 0) { + //while (messenger['work']()) { + //while (messenger._check(_pn_messenger_work(messenger._messenger, 0)) > 0) { +//console.log("A"); messenger._checkSubscriptions(); - messenger._checkErrors = false; // TODO improve error handling mechanism. messenger._emit('work'); +//console.log("B"); } +//console.log("_pump while finish"); } } } +//console.log("\t_pump exit"); }; /** @@ -157,10 +175,40 @@ Module.EventDispatch = new function() { * passing a flag to indicate that the socket is closing. */ var _close = function(fd) { +//console.log("calling close fd = " + fd); _pump(fd, true); + delete _fd2Messenger[fd]; + }; + + var _error = function(error) { + var fd = error[0]; + var messenger = _fd2Messenger[fd]; + messenger._emit('error', new Module['MessengerError'](error[2])); + delete _fd2Messenger[fd]; }; /** + * This code cheekily replaces the library socket call with our own one. + * The real socket call returns a file descriptor so we harvest that and use + * that as a key to map file descriptors to their owning Messenger. + */ + var realsocket = _socket; + _socket = function(domain, type, protocol) { + var fd = realsocket(domain, type, protocol); +//console.log("calling socket fd = " + fd); + if (_currentMessenger) { + _fd2Messenger[fd] = _currentMessenger; + } else { + console.error("Error: file descriptor " + fd + " cannot be mapped to a Messenger."); + } + return fd; + } + + this.setCurrentMessenger = function(messenger) { + _currentMessenger = messenger; + } + + /** * Register the specified Messenger as being interested in network events. */ this.registerMessenger = function(messenger) { @@ -175,11 +223,12 @@ Module.EventDispatch = new function() { Module['websocket']['on']('connection', _pump); Module['websocket']['on']('message', _pump); Module['websocket']['on']('close', _close); + Module['websocket']['on']('error', _error); _firstCall = false; } var name = messenger.getName(); - _messengers[name] = messenger; + _messengers[name] = messenger; }; /** Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/include/proton/io.h URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/include/proton/io.h?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/proton-c/include/proton/io.h (original) +++ qpid/proton/branches/fadams-javascript-binding/proton-c/include/proton/io.h Mon Sep 29 17:40:38 2014 @@ -31,6 +31,22 @@ extern "C" { #endif +/** + * A ::pn_socket_t provides an abstract handle to an IO stream. The + * pipe version is uni-directional. The network socket version is + * bi-directional. Both are non-blocking. + * + * pn_socket_t handles from ::pn_pipe() may only be used with + * ::pn_read(), ::pn_write(), ::pn_close() and pn_selector_select(). + * + * pn_socket_t handles from ::pn_listen(), ::pn_accept() and + * ::pn_connect() must perform further IO using Proton functions. + * Mixing Proton io.h functions with native IO functions on the same + * handles will result in undefined behavior. + * + * pn_socket_t handles may only be used with a single pn_io_t during + * their lifetime. + */ #if defined(_WIN32) && ! defined(__CYGWIN__) #ifdef _WIN64 typedef unsigned __int64 pn_socket_t; @@ -43,7 +59,37 @@ typedef int pn_socket_t; #define PN_INVALID_SOCKET (-1) #endif +/** + * A ::pn_io_t manages IO for a group of pn_socket_t handles. A + * pn_io_t object may have zero or one pn_selector_t selectors + * associated with it (see ::pn_io_selector()). If one is associated, + * all the pn_socket_t handles managed by a pn_io_t must use that + * pn_selector_t instance. + * + * The pn_io_t interface is single-threaded. All methods are intended + * to be used by one thread at a time, except that multiple threads + * may use: + * + * ::pn_write() + * ::pn_send() + * ::pn_recv() + * ::pn_close() + * ::pn_selector_select() + * + * provided at most one thread is calling ::pn_selector_select() and + * the other threads are operating on separate pn_socket_t handles. + */ typedef struct pn_io_t pn_io_t; + +/** + * A ::pn_selector_t provides a selection mechanism that allows + * efficient monitoring of a large number of Proton connections and + * listeners. + * + * External (non-Proton) sockets may also be monitored, either solely + * for event notification (read, write, and timer) or event + * notification and use with pn_io_t interfaces. + */ typedef struct pn_selector_t pn_selector_t; PN_EXTERN pn_io_t *pn_io(void); Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c (original) +++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/iocp.c Mon Sep 29 17:40:38 2014 @@ -964,8 +964,15 @@ static void drain_zombie_completions(ioc } } + unsigned shutdown_grace = 2000; + char *override = getenv("PN_SHUTDOWN_GRACE"); + if (override) { + int grace = atoi(override); + if (grace > 0 && grace < 60000) + shutdown_grace = (unsigned) grace; + } pn_timestamp_t now = pn_i_now(); - pn_timestamp_t deadline = now + 2000; + pn_timestamp_t deadline = now + shutdown_grace; while (pn_list_size(iocp->zombie_list)) { if (now >= deadline) @@ -977,7 +984,7 @@ static void drain_zombie_completions(ioc } now = pn_i_now(); } - if (now >= deadline && pn_list_size(iocp->zombie_list)) + if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace) // Should only happen if really slow TCP handshakes, i.e. total network failure iocp_log("network failure on Proton shutdown\n"); } Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/schannel.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/schannel.c?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/schannel.c (original) +++ qpid/proton/branches/fadams-javascript-binding/proton-c/src/windows/schannel.c Mon Sep 29 17:40:38 2014 @@ -220,9 +220,9 @@ static int ssl_failed(pn_ssl_t *ssl, cha reason = buf; } ssl->ssl_closed = true; - ssl->app_input_closed = ssl->app_output_closed = PN_ERR; - ssl->transport->tail_closed = true; + ssl->app_input_closed = ssl->app_output_closed = PN_EOS; ssl->state = SSL_CLOSED; + pni_close_tail(ssl->transport); pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", reason); return PN_EOS; } @@ -255,6 +255,8 @@ static void ssl_session_free( pn_ssl_ses pn_ssl_domain_t *pn_ssl_domain( pn_ssl_mode_t mode ) { + if (mode == PN_SSL_MODE_SERVER) + return NULL; // Temporary: not ready for ctest, hide from isSSLPresent() pn_ssl_domain_t *domain = (pn_ssl_domain_t *) calloc(1, sizeof(pn_ssl_domain_t)); if (!domain) return NULL; @@ -284,8 +286,9 @@ pn_ssl_domain_t *pn_ssl_domain( pn_ssl_m void pn_ssl_domain_free( pn_ssl_domain_t *domain ) { - if (--domain->ref_count == 0) { + if (!domain) return; + if (--domain->ref_count == 0) { if (domain->cert_context) CertFreeCertificateContext(domain->cert_context); if (domain->cert_store) @@ -1118,7 +1121,7 @@ static ssize_t process_input_ssl(pn_io_l static ssize_t process_output_ssl( pn_io_layer_t *io_layer, char *buffer, size_t max_len) { pn_ssl_t *ssl = (pn_ssl_t *)io_layer->context; - if (!ssl) return PN_ERR; + if (!ssl) return PN_EOS; ssl_log( ssl, "process_output_ssl( max_len=%d )\n",max_len ); ssize_t written = 0; @@ -1129,7 +1132,7 @@ static ssize_t process_output_ssl( pn_io // output buffers eclusively for internal handshake use until negotiation complete client_handshake_init(ssl); if (ssl->state == SSL_CLOSED) - return PN_ERR; + return PN_EOS; ssl->state = NEGOTIATING; } Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py (original) +++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/common.py Mon Sep 29 17:40:38 2014 @@ -82,7 +82,7 @@ def isSSLPresent(): """ True if a suitable SSL library is available. """ try: - xxx = SSLDomain(SSLDomain.MODE_CLIENT) + xxx = SSLDomain(SSLDomain.MODE_SERVER) return True except SSLUnavailable, e: # SSL libraries not installed Modified: qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py?rev=1628232&r1=1628231&r2=1628232&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py (original) +++ qpid/proton/branches/fadams-javascript-binding/tests/python/proton_tests/messenger.py Mon Sep 29 17:40:38 2014 @@ -984,6 +984,11 @@ class Pump: class SelectableMessengerTest(common.Test): def testSelectable(self, count = 1): + if os.name=="nt": + # Conflict between native OS select() in Pump and IOCP based pn_selector_t + # makes this fail on Windows (see PROTON-668). + raise Skipped("Invalid test on Windows with IOCP.") + mrcv = Messenger() mrcv.passive = True mrcv.subscribe("amqp://~0.0.0.0:1234") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org