Repository: qpid-proton Updated Branches: refs/heads/master e8029597b -> 39cd8c5dc
PROTON-760: Improve the JavaScript binding's internal Event loop and add additional tests. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/39cd8c5d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/39cd8c5d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/39cd8c5d Branch: refs/heads/master Commit: 39cd8c5dc75d4e5e68c67360f320fc7a2fd5e29a Parents: e802959 Author: Fraser Adams <fad...@apache.org> Authored: Sat Nov 29 12:20:24 2014 +0000 Committer: Fraser Adams <fad...@apache.org> Committed: Sat Nov 29 12:20:24 2014 +0000 ---------------------------------------------------------------------- examples/messenger/javascript/client.js | 3 +- examples/messenger/javascript/drain.js | 0 examples/messenger/javascript/recv.js | 0 examples/messenger/javascript/send.html | 1 + examples/messenger/javascript/send.js | 0 examples/messenger/javascript/server.js | 0 examples/messenger/javascript/spout.js | 0 examples/messenger/javascript/ws2tcp.js | 0 proton-c/bindings/javascript/CMakeLists.txt | 8 +- proton-c/bindings/javascript/TODO | 8 +- proton-c/bindings/javascript/message.js | 37 ++-- proton-c/bindings/javascript/messenger.js | 70 +++--- proton-c/bindings/javascript/module.js | 173 ++++++++++----- tests/javascript/msgr-recv.js | 265 +++++++++++++++++++++++ tests/javascript/msgr-send-common.js | 245 +++++++++++++++++++++ tests/javascript/msgr-send.html | 123 +++++++++++ tests/javascript/msgr-send.js | 100 +++++++++ 17 files changed, 919 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/client.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/client.js b/examples/messenger/javascript/client.js old mode 100644 new mode 100755 index 62f9a61..a3890b6 --- a/examples/messenger/javascript/client.js +++ b/examples/messenger/javascript/client.js @@ -30,7 +30,6 @@ if (typeof process === 'object' && typeof require === 'function') { var replyTo = "~/replies"; var msgtext = "Hello World!"; var tracker = null; - var running = true; var message = new proton.Message(); var messenger = new proton.Messenger(); @@ -39,7 +38,7 @@ if (typeof process === 'object' && typeof require === 'function') { while (messenger.incoming()) { var t = messenger.get(message); - console.log("Reply"); + console.log("Reply:"); console.log("Address: " + message.getAddress()); console.log("Subject: " + message.getSubject()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/drain.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/drain.js b/examples/messenger/javascript/drain.js old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/recv.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/recv.js b/examples/messenger/javascript/recv.js old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/send.html ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/send.html b/examples/messenger/javascript/send.html index c61e3f2..174a0c6 100644 --- a/examples/messenger/javascript/send.html +++ b/examples/messenger/javascript/send.html @@ -63,6 +63,7 @@ console.log("body = " + body); message.body = body; messenger.put(message); + messenger.send(); }; var errorHandler = function(error) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/send.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/send.js b/examples/messenger/javascript/send.js old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/server.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/server.js b/examples/messenger/javascript/server.js old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/spout.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/spout.js b/examples/messenger/javascript/spout.js old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/examples/messenger/javascript/ws2tcp.js ---------------------------------------------------------------------- diff --git a/examples/messenger/javascript/ws2tcp.js b/examples/messenger/javascript/ws2tcp.js old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/proton-c/bindings/javascript/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/CMakeLists.txt b/proton-c/bindings/javascript/CMakeLists.txt index 38d039b..f2538cf 100644 --- a/proton-c/bindings/javascript/CMakeLists.txt +++ b/proton-c/bindings/javascript/CMakeLists.txt @@ -49,7 +49,7 @@ CMAKE_FORCE_C_COMPILER("${CMAKE_C_COMPILER}" Clang) if (CMAKE_BUILD_TYPE STREQUAL "Debug") message(STATUS "JavaScript build type is \"Debug\"") else() - set (CMAKE_BUILD_TYPE Release) + set(CMAKE_BUILD_TYPE Release) message(STATUS "JavaScript build type is \"Release\"") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3") set(EMSCRIPTEN_LINK_OPTIMISATIONS "-O2 --closure 1") @@ -96,7 +96,9 @@ add_custom_command( DEPENDS ${PN_PATH}/src/protocol.h.py ) -set(COMPILE_WARNING_FLAGS "-Werror -Wall -pedantic-errors -Wno-comment -Wno-warn-absolute-paths") +#set(COMPILE_WARNING_FLAGS "-Werror -Wall -pedantic-errors -Wno-comment -Wno-warn-absolute-paths") +#TODO re-add -Werror when warning had been fixed. +set(COMPILE_WARNING_FLAGS "-Wall -pedantic-errors -Wno-comment -Wno-warn-absolute-paths") # Explicitly set PLATFORM_DEFINITIONS to Linux ones for emscripten as we don't # want to inadvertently use Windows versions if we happen to be cross-compiling @@ -218,7 +220,7 @@ set_target_properties( # fiddly with node.js packages. This behaviour might be reinstated if the # packaging mechanism improves. - LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_ pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_work', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription _set_context', '_pn_subscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_s et_creation_time', '_pn_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_p n_data_put_float', '_pn_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_capacity', '_pn_selectable_writable', '_pn_selectable_pending', '_pn_s electable_is_terminal', '_pn_selectable_fd', '_pn_selectable_free']\"" + LINK_FLAGS "-s \"EXPORT_NAME='proton'\" -s \"WEBSOCKET_SUBPROTOCOL='AMQPWSB10'\" ${EMSCRIPTEN_LINK_OPTIMISATIONS} --memory-init-file 0 --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-open.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/module.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/error.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/messenger.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/subscription.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/message.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-uuid.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-symbol.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-described.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-array.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-typed-number.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-long.js --pre-js ${CMAKE_CURRENT_SOURCE_DIR}/data-binary.js --post-js ${CMAKE_CURRENT_SOURCE_DIR}/binding-close.js -s DEFAULT_LIBRARY_FUNCS_TO_INCLUDE=\"[]\" -s EXPORTED_FUNCTIONS=\"['_pn_get_version_major', '_ pn_get_version_minor', '_pn_bytes', '_pn_error_text', '_pn_code', '_pn_messenger', '_pn_messenger_name', '_pn_messenger_set_blocking', '_pn_messenger_free', '_pn_messenger_errno', '_pn_messenger_error', '_pn_messenger_get_outgoing_window', '_pn_messenger_set_outgoing_window', '_pn_messenger_get_incoming_window', '_pn_messenger_set_incoming_window', '_pn_messenger_start', '_pn_messenger_stop', '_pn_messenger_stopped', '_pn_messenger_subscribe', '_pn_messenger_put', '_pn_messenger_status', '_pn_messenger_buffered', '_pn_messenger_settle', '_pn_messenger_outgoing_tracker', '_pn_messenger_recv', '_pn_messenger_receiving', '_pn_messenger_get', '_pn_messenger_incoming_tracker', '_pn_messenger_incoming_subscription', '_pn_messenger_accept', '_pn_messenger_reject', '_pn_messenger_outgoing', '_pn_messenger_incoming', '_pn_messenger_route', '_pn_messenger_rewrite', '_pn_messenger_set_passive', '_pn_messenger_selectable', '_pn_subscription_get_context', '_pn_subscription_set_context', '_pn_su bscription_address', '_pn_message', '_pn_message_id', '_pn_message_correlation_id', '_pn_message_free', '_pn_message_errno', '_pn_message_error', '_pn_message_clear', '_pn_message_is_inferred', '_pn_message_set_inferred', '_pn_message_is_durable', '_pn_message_set_durable', '_pn_message_get_priority', '_pn_message_set_priority', '_pn_message_get_ttl', '_pn_message_set_ttl', '_pn_message_is_first_acquirer', '_pn_message_set_first_acquirer', '_pn_message_get_delivery_count', '_pn_message_set_delivery_count', '_pn_message_get_user_id', '_pn_message_set_user_id', '_pn_message_get_address', '_pn_message_set_address', '_pn_message_get_subject', '_pn_message_set_subject', '_pn_message_get_reply_to', '_pn_message_set_reply_to', '_pn_message_get_content_type', '_pn_message_set_content_type', '_pn_message_get_content_encoding', '_pn_message_set_content_encoding', '_pn_message_get_expiry_time', '_pn_message_set_expiry_time', '_pn_message_get_creation_time', '_pn_message_set_creation_time', '_p n_message_get_group_id', '_pn_message_set_group_id', '_pn_message_get_group_sequence', '_pn_message_set_group_sequence', '_pn_message_get_reply_to_group_id', '_pn_message_set_reply_to_group_id', '_pn_message_encode', '_pn_message_decode', '_pn_message_instructions', '_pn_message_annotations', '_pn_message_properties', '_pn_message_body', '_pn_data', '_pn_data_free', '_pn_data_error', '_pn_data_errno', '_pn_data_clear', '_pn_data_rewind', '_pn_data_next', '_pn_data_prev', '_pn_data_enter', '_pn_data_exit', '_pn_data_lookup', '_pn_data_narrow', '_pn_data_widen', '_pn_data_type', '_pn_data_encode', '_pn_data_decode', '_pn_data_put_list', '_pn_data_put_map', '_pn_data_put_array', '_pn_data_put_described', '_pn_data_put_null', '_pn_data_put_bool', '_pn_data_put_ubyte', '_pn_data_put_byte', '_pn_data_put_ushort', '_pn_data_put_short', '_pn_data_put_uint', '_pn_data_put_int', '_pn_data_put_char', '_pn_data_put_ulong', '_pn_data_put_long', '_pn_data_put_timestamp', '_pn_data_put_float', '_p n_data_put_double', '_pn_data_put_decimal32', '_pn_data_put_decimal64', '_pn_data_put_decimal128', '_pn_data_put_uuid', '_pn_data_put_binary', '_pn_data_put_string', '_pn_data_put_symbol', '_pn_data_get_list', '_pn_data_get_map', '_pn_data_get_array', '_pn_data_is_array_described', '_pn_data_get_array_type', '_pn_data_is_described', '_pn_data_is_null', '_pn_data_get_bool', '_pn_data_get_ubyte', '_pn_data_get_byte', '_pn_data_get_ushort', '_pn_data_get_short', '_pn_data_get_uint', '_pn_data_get_int', '_pn_data_get_char', '_pn_data_get_ulong', '_pn_data_get_long', '_pn_data_get_timestamp', '_pn_data_get_float', '_pn_data_get_double', '_pn_data_get_decimal32', '_pn_data_get_decimal64', '_pn_data_get_decimal128', '_pn_data_get_uuid', '_pn_data_get_binary', '_pn_data_get_string', '_pn_data_get_symbol', '_pn_data_copy', '_pn_data_format', '_pn_data_dump', '_pn_selectable_readable', '_pn_selectable_capacity', '_pn_selectable_writable', '_pn_selectable_pending', '_pn_selectable_is_terminal' , '_pn_selectable_fd', '_pn_selectable_free']\"" ) # This command packages up the compiled proton.js into a node.js package called http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/proton-c/bindings/javascript/TODO ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/TODO b/proton-c/bindings/javascript/TODO index 092ee2e..81d5e9e 100644 --- a/proton-c/bindings/javascript/TODO +++ b/proton-c/bindings/javascript/TODO @@ -5,13 +5,13 @@ Network code is currently limited to a WebSocket transport, including for Node.j It would be good to allow a configurable transport so that Node.js and Chrome packaged apps could use native TCP sockets. -The JavaScript binding is pure JavaScript, that has been trans-compiled from C +The JavaScript binding is pure JavaScript, which has been trans-compiled from C to JavaScript using emscripten. This allows the same code to be used in a browser and Node.js, but it potentially has a performance penalty in Node.js. An alternative for Node.js might be to build a SWIG binding (recent versions of SWIG support JavaScript). This should be viewed as a complementary not competing approach as it would only work for environments like Node.js and definitely *not* browser -environments which clearly require pure JavaScript. +environments, which clearly require pure JavaScript. Optimisation are enabled for compiling and linking but there hasn't been any profiling done yet. The binding code *shouldn't* be the bottleneck but it's @@ -32,14 +32,14 @@ parallels in proton-c Although the WebSocket transport uses the sub-protocol 'AMQPWSB10' as specified in http://docs.oasis-open.org/amqp-bindmap/amqp-wsb/v1.0/amqp-wsb-v1.0.html -section 2.1 is is not technically compliant with the spec. as the AMQP data is +section 2.1 it is not technically compliant with the spec. as the AMQP data is created by the proton-c code, which produces a data-stream for the TCP transport whereas the WebSocket spec. seems to want to make use of the fact that WebSocket is a frame based transport (whereas TCP is not). This is quite hard to resolve as the binding simply sends the contents of the octet buffer created by proton over the transport and thus to make this binding compliant with the spec. would require a change to the underlying proton-c code! It is possible that this may be -done in future as any SCTP would require the ability to push AMQP frames too. +done in future as SCTP support would require the ability to push AMQP frames too. In the mean time fortunately the Java Broker WebSocket transport is actually tolerant of this off-spec. behaviour. My personal view is that both approaches should be valid and in particular using the standard TCP framing means that it http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/proton-c/bindings/javascript/message.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/message.js b/proton-c/bindings/javascript/message.js index 564cc6e..cbdba55 100644 --- a/proton-c/bindings/javascript/message.js +++ b/proton-c/bindings/javascript/message.js @@ -42,16 +42,18 @@ Module['Message'] = function() { // Message Constructor. // ************************* Public properties **************************** - this['instructions'] = null; - this['annotations'] = null; - // Intitialise with an empty Object so we can set properties in a natural way. // message.properties.prop1 = "foo"; // message.properties.prop2 = "bar"; this['properties'] = {}; - this['body'] = null; - this['data'] = null; + /** + // The properties may be used, but are initially undefined. + this['instructions']; + this['annotations']; + this['body']; + this['data']; + */ }; // Expose constructor as package scope variable to make internal calls less verbose. @@ -100,22 +102,22 @@ _Message_._preEncode = function() { var body = new Data(_pn_message_body(this._message)); inst.clear(); - if (this['instructions']) { + if (this['instructions'] !== undefined) { inst['putObject'](this['instructions']); } ann.clear(); - if (this['annotations']) { + if (this['annotations'] !== undefined) { ann['putObject'](this['annotations']); } props.clear(); - if (this['properties']) { + if (this['properties'] !== undefined) { props['putObject'](this['properties']); } body.clear(); - if (this['body']) { + if (this['body'] !== undefined) { var contentType = this['getContentType'](); if (contentType) { var value = this['body']; @@ -154,13 +156,13 @@ _Message_._postDecode = function(decodeBinaryAsString) { if (inst.next()) { this['instructions'] = inst['getObject'](); } else { - this['instructions'] = {}; + delete this['instructions']; } if (ann.next()) { this['annotations'] = ann['getObject'](); } else { - this['annotations'] = {}; + delete this['annotations']; } if (props.next()) { @@ -182,8 +184,9 @@ _Message_._postDecode = function(decodeBinaryAsString) { } } } else { - this['data'] = null; - this['body'] = null; + // If no body is present ensure that the properties are undefined. + delete this['data']; + delete this['body']; } }; @@ -226,11 +229,11 @@ _Message_['getError'] = function() { */ _Message_['clear'] = function() { _pn_message_clear(this._message); - this['instructions'] = null; - this['annotations'] = null; this['properties'] = {}; - this['body'] = null; - this['data'] = null; + delete this['instructions']; + delete this['annotations']; + delete this['body']; + delete this['data']; }; /** http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/proton-c/bindings/javascript/messenger.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/messenger.js b/proton-c/bindings/javascript/messenger.js index e2f4418..5c96cb5 100644 --- a/proton-c/bindings/javascript/messenger.js +++ b/proton-c/bindings/javascript/messenger.js @@ -119,6 +119,9 @@ Module['Messenger'] = function(name) { // Messenger Constructor. */ _pn_messenger_set_blocking(this._messenger, false); + // Set the Messenger "passive" as we are supplying our own event loop here. + _pn_messenger_set_passive(this._messenger, true); + // Subscriptions that haven't yet completed, used for managing subscribe events. this._pendingSubscriptions = []; @@ -234,7 +237,7 @@ _Messenger_._checkSubscriptions = function() { * pn_messenger_set_timeout() * pn_messenger_set_blocking() * pn_messenger_interrupt() - * pn_messenger_send() // Not sure if this is useful in JavaScript. + * pn_messenger_work() - omitted because we have our own JavaScript Event loop. */ /** @@ -416,14 +419,19 @@ _Messenger_['start'] = function() { * should be stopped before being discarded to ensure a clean shutdown * handshake occurs on any internally managed connections. * <p> - * The Messenger may require some time to stop if it is busy, and in that - * case will return {@link proton.Error.INPROGRESS}. In that case, call isStopped - * to see if it has fully stopped. + * The Messenger may require some time to stop if it is busy, so it is + * necessary to call isStopped to see if it has fully stopped. * @method stop * @memberof! proton.Messenger# */ _Messenger_['stop'] = function() { _pn_messenger_stop(this._messenger); + + // When we call stop it's quite likely that it will be busy. We call + // Module.EventDispatch.pump to flush the Messenger Event loop, but we + // wrap the call in a setTimeout to make sure that any Events generated + // by the flush occur on the next "tick" of the JavaScript Event loop. + setTimeout(Module.EventDispatch.pump, 0); }; /** @@ -475,21 +483,12 @@ _Messenger_['subscribe'] = function(source) { * @method put * @memberof! proton.Messenger# * @param {proton.Message} message a Message to send. - * @param {boolean} flush if this is set true or is undefined then messages are - * flushed (this is the default). If explicitly set to false then messages - * may not be sent immediately and might require an explicit call to work(). - * This may be used to "batch up" messages and *may* be more efficient. * @returns {proton.Data.Long} a tracker. */ -_Messenger_['put'] = function(message, flush) { - flush = flush === false ? false : true; // Defaults to true if not explicitly specified. +_Messenger_['put'] = function(message) { message._preEncode(); this._check(_pn_messenger_put(this._messenger, message._message)); - if (flush) { - Module.EventDispatch.flush(); - } - // Getting the tracker is a little tricky as it is a 64 bit number. The way // emscripten handles this is to return the low 32 bits directly and pass // the high 32 bits via the tempRet0 variable. We use Data.Long to pass the @@ -500,6 +499,32 @@ _Messenger_['put'] = function(message, flush) { }; /** + * Send messages from a Messenger's outgoing queue. This method forces the Event + * loop to pump data for as long as the underlying socket remains writeable. + * Note that after calling send() applications should yield control to the JavaScript + * Event loop by calling setTimeout() or process.nextTick() so that the underlying + * network processing can actually take place. + * @method send + * @memberof! proton.Messenger# + */ +_Messenger_['send'] = function(number) { + Module.EventDispatch.pump(); +}; + +/** + * Gets the aggregate bufferedAmount values from all of the underlying WebSockets. + * This value represents the amount of data buffered but not yet sent over the + * network. If it grows too high it is a sign that the application is sending too + * much data and should be throttled by yielding control to the JavaScript Event loop. + * @method getBufferedAmount + * @memberof! proton.Messenger# + * @returns {number} the total amount of data buffered by the Messenger's sockets. + */ +_Messenger_['getBufferedAmount'] = function() { + return Module.EventDispatch.getBufferedAmount(this); +}; + +/** * Gets the last known remote state of the delivery associated with the given tracker. * @method status * @memberof! proton.Messenger# @@ -558,25 +583,12 @@ _Messenger_['settle'] = function(tracker) { }; /** - * Sends or receives any outstanding messages queued for a Messenger. - * For JavaScript the only timeout that makes sense is 0 (do not block). - * This method may also do I/O work other than sending and receiving messages. - * For example, closing connections after messenger.stop() has been called. - * @method work - * @memberof! proton.Messenger# - * @returns {boolean} true if there is work still to do, false otherwise. - */ -_Messenger_['work'] = function() { - return (this._check(_pn_messenger_work(this._messenger, 0)) > 0); -}; - -/** * Receives up to limit messages into the incoming queue. If no value for limit * is supplied, this call will receive as many messages as it can buffer internally. * @method recv * @memberof! proton.Messenger# - * @param {number} limit the maximum number of messages to receive or -1 to to receive - * as many messages as it can buffer internally. + * @param {number} limit the maximum number of messages to receive. If unspecified + * receive as many messages as it can buffer internally. */ _Messenger_['recv'] = function(limit) { _pn_messenger_recv(this._messenger, (limit ? limit : -1)); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/proton-c/bindings/javascript/module.js ---------------------------------------------------------------------- diff --git a/proton-c/bindings/javascript/module.js b/proton-c/bindings/javascript/module.js index b755f59..59d2517 100644 --- a/proton-c/bindings/javascript/module.js +++ b/proton-c/bindings/javascript/module.js @@ -87,7 +87,7 @@ if (typeof global === 'object') { // If Node.js if (global['PROTON_TOTAL_STACK']) { Module['TOTAL_STACK'] = global['PROTON_TOTAL_STACK']; } -} else if (typeof window === 'object') { // If browser +} else if (typeof window === 'object') { // If Browser if (window['PROTON_TOTAL_MEMORY']) { Module['TOTAL_MEMORY'] = window['PROTON_TOTAL_MEMORY']; } @@ -134,8 +134,8 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl * has to be mounted before can listen for any of these events. */ Module['websocket']['on']('open', _pump); - Module['websocket']['on']('connection', _pump); Module['websocket']['on']('message', _pump); + Module['websocket']['on']('connection', _connectionHandler); Module['websocket']['on']('close', _closeHandler); Module['websocket']['on']('error', _errorHandler); @@ -192,6 +192,26 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl }; /** + * This method uses some low-level emscripten internals (stream = FS.getStream(fd), + * sock = stream.node.sock, peer = SOCKFS.websocket_sock_ops.getPeer) to find + * the underlying WebSocket associated with a given file descriptor value. + */ + var _getWebSocket = function(fd) { + var stream = FS.getStream(fd); + if (stream) { + var sock = stream.node.sock; + if (sock.server) { + return sock.server; + } + var peer = SOCKFS.websocket_sock_ops.getPeer(sock, sock.daddr, sock.dport); + if (peer) { + return peer.socket; + } + } + return null; + }; + + /** * This method iterates through all registered Messengers and retrieves any * pending selectables, which are stored in a _selectables map keyed by fd. */ @@ -206,7 +226,7 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl if (fd === -1) { _pn_selectable_free(sel); } else { - _selectables[fd] = {messenger: messenger, selectable: sel}; + _selectables[fd] = {messenger: messenger, selectable: sel, socket: _getWebSocket(fd)}; } } } @@ -216,8 +236,8 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl /** * Continually pump data while there's still work to do. */ - var _pump = function() { - while (_pumpOnce()); + var _pump = function(fd) { + while (_pumpOnce(fd)); }; /** @@ -228,83 +248,99 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl * mask = sock.sock_ops.poll(sock)). We use the internals so we don't have * to massage from file descriptors into the C style poll interface. */ - var _pumpOnce = function() { + var _pumpOnce = function(fdin) { _updateSelectables(); - var count = 0; + var work = false; for (var fd in _selectables) { var selectable = _selectables[fd]; - var sel = selectable.selectable; - var terminal = _pn_selectable_is_terminal(sel); - if (terminal) { - _pn_selectable_free(sel); - delete _selectables[fd]; - } else { - var stream = FS.getStream(fd); - if (stream) { - var sock = stream.node.sock; - if (sock.sock_ops.poll) { - var mask = sock.sock_ops.poll(sock); // Low-level poll call. - if (mask) { - var messenger = selectable.messenger; - var capacity = _pn_selectable_capacity(sel) > 0; - var pending = _pn_selectable_pending(sel) > 0; - - if ((mask & POLLIN) && capacity) { + if (selectable.socket) { + var messenger = selectable.messenger; + var sel = selectable.selectable; + var terminal = _pn_selectable_is_terminal(sel); + if (terminal) { +//console.log(fd + " is terminal"); + _closeHandler(fd); + } else if (!fdin || (fd == fdin)) { + var stream = FS.getStream(fd); + if (stream) { + var sock = stream.node.sock; + if (sock.sock_ops.poll) { + var mask = sock.sock_ops.poll(sock); // Low-level poll call. + if (mask) { + var capacity = _pn_selectable_capacity(sel) > 0; + var pending = _pn_selectable_pending(sel) > 0; + + if ((mask & POLLIN) && capacity) { //console.log("- readable fd = " + fd + ", capacity = " + _pn_selectable_capacity(sel)); - _error = null; // May get set by _pn_selectable_readable. - _pn_selectable_readable(sel); - count++; // Should this be inside the test for _error? Don't know. + _error = null; // May get set by _pn_selectable_readable. + _pn_selectable_readable(sel); + work = true; + } + if ((mask & POLLOUT) && pending) { +//console.log("- writable fd = " + fd + ", pending = " + _pn_selectable_pending(sel)); + _pn_selectable_writable(sel); + work = true; + } + var errno = messenger['getErrno'](); _error = errno ? messenger['getError']() : _error; if (_error) { _errorHandler([fd, 0, _error]); } else { - // Don't send work event if it's a listen socket. - if (!sock.server) { + // Don't send work Event if it's a listen socket. + if (work && !sock.server) { messenger._checkSubscriptions(); messenger._emit('work'); } } } - if ((mask & POLLOUT) && pending) { -//console.log("- writeable fd = " + fd + ", pending = " + _pn_selectable_pending(sel)); - _pn_selectable_writable(sel); - //TODO looks like this block isn't needed. Need to - //check with a test-case that writes data as fast as - //it can. If not needed then delete. - /* - count++; - // Check _selectables again in case the call to - // _pn_selectable_writable caused a socket close. - if (_selectables[fd]) { - messenger._checkSubscriptions(); - messenger._emit('work'); - } - */ - } } } } } } + return work; + }; - return count; + /** + * Handler for the emscripten socket connection event. + * The causes _pump to be called with no fd, forcing all fds to be checked. + */ + var _connectionHandler = function(fd) { + _pump(); }; /** * Handler for the emscripten socket close event. */ var _closeHandler = function(fd) { + _updateSelectables(); + var selectable = _selectables[fd]; - if (selectable) { - // Close and remove the selectable. - var sel = selectable.selectable; - _pn_selectable_free(sel); // This closes the underlying socket too. - delete _selectables[fd]; + if (selectable && selectable.socket) { + selectable.socket = null; +//console.log("_closeHandler fd = " + fd); + + /** + * We use the timeout to ensure that execution of the function to + * actually free and remove the selectable is deferred until next + * time round the (internal JavaScript) event loop. This turned out + * to be necessary because in some cases the ws WebSocket library + * calls the onclose callback (concurrently!!) before the onmessage + * callback exits, which could result in _pn_selectable_free being + * called whilst _pn_selectable_writable is executing, which is bad!! + */ + setTimeout(function() { +//console.log("deferred _closeHandler fd = " + fd); + // Close and remove the selectable. + var sel = selectable.selectable; + _pn_selectable_free(sel); // This closes the underlying socket too. + delete _selectables[fd]; - var messenger = selectable.messenger; - messenger._emit('work'); + var messenger = selectable.messenger; + messenger._emit('work'); + }, 0); } }; @@ -330,7 +366,7 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl var subscriptions = messenger._pendingSubscriptions; for (var i = 0; i < subscriptions.length; i++) { subscription = subscriptions[i]; - // Use == not === as we don't care if fd is a number or a string. + // Use == not === as fd is a number and subscription.fd is a string. if (subscription.fd == fd) { messenger._pendingSubscriptions.splice(i, 1); if (message.indexOf('EHOSTUNREACH:') === 0) { @@ -347,13 +383,35 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl /** * Flush any data that has been written by the Messenger put() method. - * @method flush + * @method pump + * @memberof! proton.EventDispatch# */ - this.flush = function() { + this.pump = function() { _pump(); }; /** + * For a given Messenger instance retrieve the bufferedAmount property from + * any connected WebSockets and return the aggregate total sum. + * @method getBufferedAmount + * @memberof! proton.EventDispatch# + * @param {proton.Messenger} messenger the Messenger instance that we want + * to find the total buffered amount for. + * @returns {number} the total sum of the bufferedAmount property across all + * connected WebSockets. + */ + this.getBufferedAmount = function(messenger) { + var total = 0; + for (var fd in _selectables) { + var selectable = _selectables[fd]; + if (selectable.messenger === messenger && selectable.socket) { + total += selectable.socket.bufferedAmount | 0; + } + } + return total; + }; + + /** * Subscribe to a specified source address. * <p> * This method is delegated to by the subscribe method of {@link proton.Messenger}. @@ -406,9 +464,6 @@ Module.EventDispatch = new function() { // Note the use of new to create a Singl var name = messenger['getName'](); _messengers[name] = messenger; - - // Set the Messenger "passive" as we are supplying our own event loop here. - _pn_messenger_set_passive(messenger._messenger, true); }; /** http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/tests/javascript/msgr-recv.js ---------------------------------------------------------------------- diff --git a/tests/javascript/msgr-recv.js b/tests/javascript/msgr-recv.js new file mode 100755 index 0000000..383aad4 --- /dev/null +++ b/tests/javascript/msgr-recv.js @@ -0,0 +1,265 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +var Statistics = function() { + this.startTime = 0; + this.latencySamples = 0; + this.latencyTotal = 0; + this.latencyMin = 0; + this.latencyMax = 0; +}; + +Statistics.prototype.start = function() { + this.startTime = +new Date(); +}; + +Statistics.prototype.messageReceived = function(msg) { + var ts = +msg.getCreationTime(); // The + gets the value of the returned Data Object. + if (ts) { + var l = +new Date() - ts; + if (l) { + this.latencyTotal += l; + this.latencySamples += 1; + if (this.latencySamples === 1) { + this.latencyMin = this.latencyMax = l; + } else { + if (this.latencyMin > l) { + this.latencyMin = l; + } + if (this.latencyMax < l) { + this.latencyMax = l; + } + } + } + } +}; + +Statistics.prototype.report = function(sent, received) { + var seconds = (+new Date() - this.startTime)/1000; + console.log("Messages sent: " + sent + " received: " + received); + console.log("Total time: " + seconds + " seconds"); + if (seconds) { + console.log("Throughput: " + (sent/seconds) + " msgs/sec sent"); + console.log("Throughput: " + (received/seconds) + " msgs/sec received"); + } + + if (this.latencySamples) { + console.log("Latency (ms): " + this.latencyMin + " min " + + this.latencyMax + " max " + + (this.latencyTotal/this.latencySamples) + " avg"); + } +}; + + +var MessengerReceive = function(opts, callback) { + //if (opts.verbose) { + console.log("subscriptions = " + opts.subscriptions); + console.log("messageCount = " + opts.messageCount); + console.log("recvCount = " + opts.recvCount); + console.log("incomingWindow = " + opts.incomingWindow); + console.log("reportInterval = " + opts.reportInterval); + console.log("reply = " + opts.reply); + console.log("forwardingTargets = " + opts.forwardingTargets); + console.log("name = " + opts.name); + console.log("readyText = " + opts.readyText); + console.log("verbose = " + opts.verbose); + console.log(); + //} + + var stats = new Statistics(); + var running = true; // Used to avoid calling stop multiple times. + var sent = 0; + var received = 0; + var forwardingIndex = 0; + + var message = new proton.Message(); + var messenger = new proton.Messenger(opts.name); + + var pumpData = function() { + if (opts.verbose) { + console.log("Calling messenger.recv(" + opts.recvCount + ")"); + } + messenger.recv(opts.recvCount); + + if (opts.verbose) { + console.log("Messages on incoming queue: " + messenger.incoming()); + } + while (messenger.incoming()) { + // start the timer only after receiving the first msg + if (received === 0) { + stats.start(); + } + + messenger.get(message); + received += 1; + //console.log("Address: " + message.getAddress()); + //console.log("CorrelationID: " + message.getCorrelationID()); + //console.log("Content: " + message.body); + stats.messageReceived(message); + + if (opts.reply) { + var replyTo = message.getReplyTo(); + if (replyTo) { + if (opts.verbose) { + console.log("Replying to: " + replyTo); + } + message.setAddress(replyTo); + message.setCreationTime(new Date()); + messenger.put(message); + sent += 1; + } + } + } + + // Check for exit condition. + if (running && !(opts.messageCount === 0 || received < opts.messageCount)) { + // Wait for outgoing to be zero before calling stop so pending sends + // get flushed properly. + if (messenger.outgoing()) { + if (opts.verbose) { + console.log("Flushing pending sends"); + } + } else { +//console.log("******* messenger.stop()"); + messenger.stop(); + running = false; + stats.report(sent, received); + if (callback) { + callback(stats); + } + } + } + + if (messenger.isStopped()) { +//console.log("-------------------- messenger.isStopped()"); + message.free(); + messenger.free(); + } + }; + + this.start = function() { + messenger.on('error', function(error) {console.log("** error **"); console.log(error);}); + messenger.on('work', pumpData); + messenger.on('subscription', function(subscription) { + // Hack to let test scripts know when the receivers are ready (so that the + // senders may be started). +console.log("****** subscription " + subscription.getAddress() + " succeeded") + if (opts.readyText) { + console.log(opts.readyText); + } + }); + + if (opts.incomingWindow) { + messenger.setIncomingWindow(opts.incomingWindow); + } + messenger.start(); + + // Unpack addresses that were specified using comma-separated list + var subscriptions = opts.subscriptions.split(','); + for (var i = 0; i < subscriptions.length; i++) { + var subscription = subscriptions[i]; + if (opts.verbose) { + console.log("Subscribing to " + subscription); + } + messenger.subscribe(subscription); + } + }; +}; + + +// Check if the environment is Node.js and if not log an error and exit. +if (typeof process === 'object' && typeof require === 'function') { + var usage = + 'Usage: msgr-recv [OPTIONS]\n' + + ' -a <addr>[,<addr>]* \tAddresses to listen on [amqp://~0.0.0.0]\n' + + ' -c # \tNumber of messages to receive before exiting [0=forever]\n' + + ' -b # \tArgument to Messenger::recv(n) [2048]\n' + + ' -w # \tSize for incoming window [0]\n' + + ' -e # \t# seconds to report statistics, 0 = end of test [0] *TBD*\n' + + ' -R \tSend reply if \'reply-to\' present\n' + + ' -F <addr>[,<addr>]* \tAddresses used for forwarding received messages\n' + + ' -N <name> \tSet the container name to <name>\n' + + ' -X <text> \tPrint \'<text>\\n\' to stdout after all subscriptions are created\n' + + ' -V \tEnable debug logging\n'; + + // Increase the virtual heap available to the emscripten compiled C runtime. + // This allows us to test a really big string. + PROTON_TOTAL_MEMORY = 140000000; + PROTON_TOTAL_STACK = 25000000; // Needs to be bigger than the biggest string. + var proton = require("qpid-proton"); + + var opts = {}; + opts.subscriptions = 'amqp://~0.0.0.0'; + opts.messageCount = 0; + opts.recvCount = -1; + opts.incomingWindow; + opts.reportInterval = 0; + opts.reply = false; + opts.forwardingTargets; + opts.name; + opts.readyText; + opts.verbose = false; + + var args = process.argv.slice(2); + if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log(usage); + process.exit(0); + } + + for (var i = 0; i < args.length; i++) { + var arg = args[i]; + if (arg.charAt(0) === '-') { + if (arg === '-V') { + opts.verbose = true; + } else if (arg === '-R') { + opts.reply = true; + } else { + i++; + var val = args[i]; + if (arg === '-a') { + opts.subscriptions = val; + } else if (arg === '-c') { + opts.messageCount = val; + } else if (arg === '-b') { + opts.recvCount = val; + } else if (arg === '-w') { + opts.incomingWindow = val; + } else if (arg === '-e') { + opts.reportInterval = val; + } else if (arg === '-F') { + opts.forwardingTargets = val; + } else if (arg === '-N') { + opts.name = val; + } else if (arg === '-X') { + opts.readyText = val; + } + } + } + } + } + + var receiver = new MessengerReceive(opts); + receiver.start(); +} else { + console.error("msgr-recv.js should be run in Node.js"); +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/tests/javascript/msgr-send-common.js ---------------------------------------------------------------------- diff --git a/tests/javascript/msgr-send-common.js b/tests/javascript/msgr-send-common.js new file mode 100644 index 0000000..8a5f98e --- /dev/null +++ b/tests/javascript/msgr-send-common.js @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * This file is essentially a "module" that is common to msgr-send.js and msgr-send.html. + * It defines the Statistics and MessengerSend classes and if the environment is Node.js + * it will import qpid-proton and export MessengerSend for use in msgr-send.js. + * Because it's really a module/library trying to execute msgr-send-common.js won't + * itself do anything terribly exciting. + */ + +var Statistics = function() { + this.startTime = 0; + this.latencySamples = 0; + this.latencyTotal = 0; + this.latencyMin = 0; + this.latencyMax = 0; +}; + +Statistics.prototype.start = function() { + this.startTime = +new Date(); +}; + +Statistics.prototype.messageReceived = function(msg) { + var ts = +msg.getCreationTime(); // The + gets the value of the returned Data Object. + if (ts) { + var l = +new Date() - ts; + if (l) { + this.latencyTotal += l; + this.latencySamples += 1; + if (this.latencySamples === 1) { + this.latencyMin = this.latencyMax = l; + } else { + if (this.latencyMin > l) { + this.latencyMin = l; + } + if (this.latencyMax < l) { + this.latencyMax = l; + } + } + } + } +}; + +Statistics.prototype.report = function(sent, received) { + var seconds = (+new Date() - this.startTime)/1000; + console.log("Messages sent: " + sent + " received: " + received); + console.log("Total time: " + seconds + " seconds"); + if (seconds) { + console.log("Throughput: " + (sent/seconds) + " msgs/sec sent"); + console.log("Throughput: " + (received/seconds) + " msgs/sec received"); + } + + if (this.latencySamples) { + console.log("Latency (ms): " + this.latencyMin + " min " + + this.latencyMax + " max " + + (this.latencyTotal/this.latencySamples) + " avg"); + } +}; + + +var MessengerSend = function(opts, callback) { + //if (opts.verbose) { + console.log("addresses = " + opts.addresses); + console.log("messageCount = " + opts.messageCount); + console.log("messageSize = " + opts.messageSize); + console.log("recvCount = " + opts.recvCount); + console.log("sendBatch = " + opts.sendBatch); + console.log("outgoingWindow = " + opts.outgoingWindow); + console.log("reportInterval = " + opts.reportInterval); + console.log("getReplies = " + opts.getReplies); + console.log("name = " + opts.name); + console.log("verbose = " + opts.verbose); + console.log(); + //} + + var stats = new Statistics(); + var targets = []; + var running = true; // Used to avoid calling stop multiple times. + var sent = 0; + var received = 0; + + var message = new proton.Message(); + var replyMessage = new proton.Message(); + var messenger = new proton.Messenger(opts.name); + + // Retrieve replies and return the number of reply messages received. + var processReplies = function() { + var received = 0; + if (opts.verbose) { + console.log("Calling messenger.recv(" + opts.recvCount + ")"); + } + messenger.recv(opts.recvCount); + + if (opts.verbose) { + console.log("Messages on incoming queue: " + messenger.incoming()); + } + while (messenger.incoming()) { + messenger.get(replyMessage); + received += 1; + //console.log("Address: " + replyMessage.getAddress()); + //console.log("Content: " + replyMessage.body); + stats.messageReceived(replyMessage); + } + return received; + }; + + // Send messages as fast as possible. This is analogous to the while loop in + // the Python msgr-send.py but we wrap in a function in JavaScript so that + // we can schedule on the JavaScript Event queue via setTimeout. This is needed + // otherwise the JavaScript Event loop is blocked and no data gets sent. + var sendData = function() { + var delay = 0; + while (opts.messageCount === 0 || (sent < opts.messageCount)) { + // Check the amount of data buffered on the socket, if it's non-zero + // exit the loop and call senData again after a short delay. This + // will throttle the send rate if necessary. + if (messenger.getBufferedAmount()) { +console.log("messenger.getBufferedAmount() = " + messenger.getBufferedAmount()); + delay = 100; + break; // Exit loop to check for exit condition and schedule to Event queue. + } + + var index = sent % targets.length; +//console.log("sent = " + sent + ", index = " + index); + + message.setAddress(targets[index]); + message.setCorrelationID(sent); + message.setCreationTime(new Date()); + messenger.put(message); + sent += 1; + + if (opts.sendBatch && (messenger.outgoing() >= opts.sendBatch)) { + if (opts.verbose) { + console.log("Calling messenger.send()") + } + messenger.send(); + + if (opts.getReplies) { + received += processReplies(); + } + break; // Exit loop to check for exit condition and yield to Event loop. + } + } + + // Check for exit condition. + if (running && !(opts.messageCount === 0 || (sent < opts.messageCount))) { + if (opts.getReplies && (received < sent)) { + received += processReplies(); + if (opts.verbose) { + console.log("Messages sent = " + sent + ", received = " + received); + } + } else if (messenger.outgoing()) { + if (opts.verbose) { + console.log("Flushing pending sends"); + } + messenger.send(); + } else { +//console.log("******* calling stop") + messenger.stop(); + running = false; + stats.report(sent, received); + if (callback) { + callback(stats); + } + } + } + + if (messenger.isStopped()) { +//console.log("-------------------- messenger.isStopped()"); + message.free(); + messenger.free(); + } else { + // schedule next call on the JavaScript Event queue. If we don't do this + // our messages won't get sent because none of the internal JavaScript + // network code will get any CPU. + + // If available we call setImmediate rather than setTimeout when the delay + // is zero. setImmediate is more efficient, in particular I noticed that + // with Node.js v0.10.18 I could get max throughput and max out CPU using + // setTimeout, but when I upgraded to v0.10.33 my throughput dropped and + // my CPU was hovering around 55% but using setImmediate the performance + // improved again. My guess is that v0.10.18 was checking for zero delay + // and calling setImmediate internally whereas v0.10.33 wasn't, but I + // can't say for sure. TODO it's possible that some browsers might do a + // better job with setImmediate too (given what I'm seeing with Node.js), + // Chrome might be one such case, but it's not universally supported. + // It might be worth adding a proper polyfill to handle this. + if (delay === 0 && typeof setImmediate === 'function') { + setImmediate(sendData); + } else { + setTimeout(sendData, delay); + } + } + }; + + this.start = function() { + message.body = Array(+opts.messageSize + 1).join('X'); + message.setReplyTo('~'); + + messenger.on('error', function(error) { + console.log(error); + opts.messageCount = -1; // Force exit condition. + }); + + if (opts.outgoingWindow) { + messenger.setOutgoingWindow(opts.outgoingWindow); + } + messenger.start(); + + // Unpack targets that were specified using comma-separated list + var addresses = opts.addresses.split(','); + for (var i = 0; i < addresses.length; i++) { + var address = addresses[i]; + targets.push(address); + } + + stats.start(); + sendData(); + }; +}; + +// If running in Node.js import the proton library and export MessengerSend. +if (typeof module === 'object') { + var proton = require("qpid-proton"); + module.exports.MessengerSend = MessengerSend; +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/tests/javascript/msgr-send.html ---------------------------------------------------------------------- diff --git a/tests/javascript/msgr-send.html b/tests/javascript/msgr-send.html new file mode 100644 index 0000000..0e688c2 --- /dev/null +++ b/tests/javascript/msgr-send.html @@ -0,0 +1,123 @@ +<!DOCTYPE html> <!-- HTML5 doctype --> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +<html> + +<head> + <title>Proton Messenger Send Benchmark</title> + <meta http-equiv="content-type" content="text/html;charset=utf-8" /> + +<!-- + Import the Messenger Binding proton.js. Note that this simple example pulls + it from the node_modules/qpid-proton/lib, which is created by the build process + so that the node.js based examples "just work", in a real Web App you would + clearly need to copy the proton.js to your own server. + + In actual fact the CMake build actually builds proton.js into the directory: + <build>/proton-c/bindings/javascript + where <build> is the build directory created to run cmake from, it is then + copied to the node_modules/qpid-proton/lib directory. + + In this example we also set the global variable PROTON_TOTAL_MEMORY in order to + increase the virtual heap available to the emscripten compiled C runtime. It + is not really necessary to do this for this application as the default value + of 16777216 is fine, it is simply done here to illustrate how to do it. +--> +<script type="text/javascript">PROTON_TOTAL_MEMORY = 140000000;</script> +<script type="text/javascript">PROTON_TOTAL_STACK = 25000000;</script> +<script type="text/javascript" src="../../node_modules/qpid-proton/lib/proton.js"></script> +<script type="text/javascript" src="./msgr-send-common.js"></script> + +<script type="text/javascript"> + +var start = function() { + var opts = {}; + opts.addresses = document.getElementById("address").value; + opts.messageCount = parseInt(document.getElementById("messageCount").value, 10); + opts.messageSize = parseInt(document.getElementById("messageSize").value, 10); + opts.sendBatch = parseInt(document.getElementById("sendBatch").value, 10); + + opts.recvCount = -1; + opts.outgoingWindow; + opts.reportInterval = 0; + opts.getReplies = false; + opts.name; + opts.verbose = false; + + var sender = new MessengerSend(opts); + sender.start(); +}; + +</script> + +<style> +body +{ + font: 13px/1.5 Helvetica, Arial, 'Liberation Sans', FreeSans, sans-serif; + overflow-x: hidden; /* Hide horizontal scrollbar */ + background: #dddddd; +} + +label +{ + display: block; + font-size: 17px; +} + +input, textarea +{ + font-size: 13px; + margin-bottom: 10px; +} +</style> + +</head> + +<body> +<div> + <label for="address">Address:</label> + <input type="text" id="address" size="40" + placeholder="amqp://user:password@host:port" + name="address" value="amqp://guest:guest@0.0.0.0" /> +</div> +<div> + <label for="messageCount">Message Count:</label> + <input type="text" id="messageCount" size="40" + name="messageCount" value="0" /> +</div> +<div> + <label for="messageSize">Message Size:</label> + <input type="text" id="messageSize" size="40" + name="messageSize" value="1024" /> +</div> +<div> + <label for="sendBatch">Send Batch Size:</label> + <input type="text" id="sendBatch" size="40" + name="sendBatch" value="1024" /> +</div> + + +<div> + <input type="button" value="start" onclick="start()"/> +</div> +</body> + +</html> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/39cd8c5d/tests/javascript/msgr-send.js ---------------------------------------------------------------------- diff --git a/tests/javascript/msgr-send.js b/tests/javascript/msgr-send.js new file mode 100755 index 0000000..8b018a7 --- /dev/null +++ b/tests/javascript/msgr-send.js @@ -0,0 +1,100 @@ +#!/usr/bin/env node +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +// Check if the environment is Node.js and if not log an error and exit. +if (typeof process === 'object' && typeof require === 'function') { + var usage = + 'Usage: msgr-send [OPTIONS]\n' + + ' -a <addr>[,<addr>]* \tThe target address [amqp://~0.0.0.0]\n' + + ' -c # \tNumber of messages to send before exiting [0=forever]\n' + + ' -b # \tSize of message body in bytes [1024]\n' + + ' -p # \tSend batches of # messages (wait for replies before sending next batch if -R) [1024]\n' + + ' -w # \tSize for outgoing window [0]\n' + + ' -e # \t# seconds to report statistics, 0 = end of test [0] *TBD*\n' + + ' -R \tWait for a reply to each sent message\n' + + ' -B # \tArgument to Messenger::recv(n) [-1]\n' + + ' -N <name> \tSet the container name to <name>\n' + + ' -V \tEnable debug logging\n'; + + // Increase the virtual heap available to the emscripten compiled C runtime. + // This allows us to test a really big string. + PROTON_TOTAL_MEMORY = 140000000; + PROTON_TOTAL_STACK = 25000000; // Needs to be bigger than the biggest string. + var proton = require("qpid-proton"); + var benchmark = require("./msgr-send-common.js"); + + var opts = {}; + opts.addresses = 'amqp://0.0.0.0'; + opts.messageCount = 0; + opts.messageSize = 1024; + opts.recvCount = -1; + opts.sendBatch = 1024; + opts.outgoingWindow; + opts.reportInterval = 0; + opts.getReplies = false; + opts.name; + opts.verbose = false; + + var args = process.argv.slice(2); + if (args.length > 0) { + if (args[0] === '-h' || args[0] === '--help') { + console.log(usage); + process.exit(0); + } + + for (var i = 0; i < args.length; i++) { + var arg = args[i]; + if (arg.charAt(0) === '-') { + if (arg === '-V') { + opts.verbose = true; + } else if (arg === '-R') { + opts.getReplies = true; + } else { + i++; + var val = args[i]; + if (arg === '-a') { + opts.addresses = val; + } else if (arg === '-c') { + opts.messageCount = val; + } else if (arg === '-b') { + opts.messageSize = val; + } else if (arg === '-B') { + opts.recvCount = val; + } else if (arg === '-p') { + opts.sendBatch = val; + } else if (arg === '-w') { + opts.outgoingWindow = val; + } else if (arg === '-e') { + opts.reportInterval = val; + } else if (arg === '-N') { + opts.name = val; + } + } + } + } + } + + var sender = new benchmark.MessengerSend(opts); + sender.start(); +} else { + console.error("msgr-send.js should be run in Node.js"); +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org