Author: fadams Date: Sat Sep 13 13:46:29 2014 New Revision: 1624737 URL: http://svn.apache.org/r1624737 Log: Improve tests and demos, in particular improve the encapsulation around the QMF handling in qpid-config.js to make it potentially reusable
Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/client.js Sat Sep 13 13:46:29 2014 @@ -21,11 +21,14 @@ // Simple client for use with server.js illustrating request/response -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("client.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + var address = "amqp://0.0.0.0"; var subject = "UK.WEATHER"; var replyTo = "~/replies"; Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/drain.js Sat Sep 13 13:46:29 2014 @@ -19,11 +19,14 @@ * */ -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("drain.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + console.log("drain not implemented yet"); process.exit(0); Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/proxy.js Sat Sep 13 13:46:29 2014 @@ -35,6 +35,12 @@ * @file */ +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("proxy.js should be run in Node.js"); + return; +} + var proxy = require('./ws2tcp.js'); var lport = 5673; Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/qpid-config.js Sat Sep 13 13:46:29 2014 @@ -20,7 +20,7 @@ */ /** - * Port of qpid-config to JavaScript for node.js, mainly intended as a demo to + * Port of qpid-config to JavaScript for Node.js, mainly intended as a demo to * illustrate using QMF2 in JavaScript using the proton.Messenger JS binding. * It illustrates a few things including how to use Messenger completely * asynchronously including using an async request/response pattern with @@ -36,180 +36,188 @@ * for complication best illustrated by the need for the correlator object. */ -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("qpid-config.js should be run in Node.js"); + return; } -var addr = 'guest:guest@localhost:5673'; -//var addr = 'localhost:5673'; -var address = 'amqp://' + addr + '/qmf.default.direct'; -console.log(address); - -var replyTo = ''; -var subscription; -var subscribed = false; - -var message = new proton.Message(); -var messenger = new proton.Messenger(); - -/** - * The correlator object is a mechanism used to correlate requests with their - * aynchronous responses. It might possible be better to make use of Promises - * to implement part of this behaviour but a mechanism would still be needed to - * correlate a request with its response callback in order to wrap things up in - * a Promise, so much of the behaviour of this object would still be required. - * In addition it seemed to make sense to make this QMF2 implementation fairly - * free of dependencies and using Promises would require external libraries. - * Instead the correlator implements "Promise-like" semantics, you might call it - * a broken Promise :-) - * <p> - * in particular the request method behaves a *bit* like Promise.all() though it - * is mostly fake and takes an array of functions that call the add() method - * which is really the method used to associate response objects by correlationID. - * The then method is used to register a listener that will be called when all - * the requests that have been registered have received responses. - * TODO error/timeout handling. - */ -var correlator = { - _resolve: null, - _objects: {}, - add: function(id) { - this._objects[id] = {complete: false, list: null}; - }, - request: function() { - this._resolve = function() {console.log("Warning: No resolver has been set")}; - return this; - }, - then: function(resolver) { - this._resolve = resolver ? resolver : this._resolve; - }, - resolve: function() { - var opcode = message.properties['qmf.opcode']; - var correlationID = message.getCorrelationID(); - var resp = this._objects[correlationID]; - if (opcode === '_query_response') { - if (resp.list) { - Array.prototype.push.apply(resp.list, message.body); // This is faster than concat. - } else { +var qmf = {}; // Create qmf namespace object. +qmf.Console = function() { // qmf.Console Constructor. + var proton = require("qpid-proton"); + var message = new proton.Message(); + var messenger = new proton.Messenger(); + + var brokerAddress = ''; + var replyTo = ''; + + /** + * The correlator object is a mechanism used to correlate requests with their + * aynchronous responses. It might possible be better to make use of Promises + * to implement part of this behaviour but a mechanism would still be needed to + * correlate a request with its response callback in order to wrap things up in + * a Promise, so much of the behaviour of this object would still be required. + * In addition it seemed to make sense to make this QMF2 implementation fairly + * free of dependencies and using Promises would require external libraries. + * Instead the correlator implements "Promise-like" semantics, you might call it + * a broken Promise :-) + * <p> + * in particular the request method behaves a *bit* like Promise.all() though it + * is mostly fake and takes an array of functions that call the add() method + * which is really the method used to associate response objects by correlationID. + * The then method is used to register a listener that will be called when all + * the requests that have been registered have received responses. + * TODO error/timeout handling. + */ + var correlator = { + _resolve: null, + _objects: {}, + add: function(id) { + this._objects[id] = {complete: false, list: null}; + }, + request: function() { + this._resolve = function() {console.log("Warning: No resolver has been set")}; + return this; + }, + then: function(resolver) { + this._resolve = resolver ? resolver : this._resolve; + }, + resolve: function() { + var opcode = message.properties['qmf.opcode']; + var correlationID = message.getCorrelationID(); + var resp = this._objects[correlationID]; + if (opcode === '_query_response') { + if (resp.list) { + Array.prototype.push.apply(resp.list, message.body); // This is faster than concat. + } else { + resp.list = message.body; + } + + var partial = message.properties['partial']; + if (!partial) { + resp.complete = true; + } + + this._objects[correlationID] = resp; + this._checkComplete(); + } else if (opcode === '_method_response' || opcode === '_exception') { resp.list = message.body; - } - - var partial = message.properties['partial']; - if (!partial) { resp.complete = true; + this._objects[correlationID] = resp; + this._checkComplete(); + } else { + console.error("Bad Message response, qmf.opcode = " + opcode); + } + }, + _checkComplete: function() { + var response = {}; + for (var id in this._objects) { + var object = this._objects[id]; + if (object.complete) { + response[id] = object.list; + } else { + return; + } } + + this._objects = {}; // Clear state ready for next call. + this._resolve(response.method ? response.method : response); + } + }; - this._objects[correlationID] = resp; - this._checkComplete(); - } else if (opcode === '_method_response' || opcode === '_exception') { - resp.list = message.body; - resp.complete = true; - this._objects[correlationID] = resp; - this._checkComplete(); - } else { - console.error("Bad Message response, qmf.opcode = " + opcode); + var pumpData = function() { + while (messenger.incoming()) { + // The second parameter forces Binary payloads to be decoded as strings + // this is useful because the broker QMF Agent encodes strings as AMQP + // binary, which is a right pain from an interoperability perspective. + var t = messenger.get(message, true); + correlator.resolve(); + messenger.accept(t); + } + + if (messenger.isStopped()) { + message.free(); + messenger.free(); } - }, - _checkComplete: function() { - var response = {}; - for (var id in this._objects) { - var object = this._objects[id]; - if (object.complete) { - response[id] = object.list; - } else { - return; + }; + + this.getObjects = function(packageName, className) { + message.setAddress(brokerAddress); + message.setSubject('broker'); + message.setReplyTo(replyTo); + message.setCorrelationID(className); + message.properties = { + "routing-key": "broker", // Added for Java Broker + "x-amqp-0-10.app-id": "qmf2", + "method": "request", + "qmf.opcode": "_query_request", + }; + message.body = { + "_what": "OBJECT", + "_schema_id": { + "_package_name": packageName, + "_class_name": className } - } + }; - this._objects = {}; // Clear state ready for next call. - this._resolve(response.method ? response.method : response); - } -}; + correlator.add(className); + messenger.put(message); + }; + + this.invokeMethod = function(object, method, arguments) { + var correlationID = 'method'; + message.setAddress(brokerAddress); + message.setSubject('broker'); + message.setReplyTo(replyTo); + message.setCorrelationID(correlationID); + message.properties = { + "routing-key": "broker", // Added for Java Broker + "x-amqp-0-10.app-id": "qmf2", + "method": "request", + "qmf.opcode": "_method_request", + }; + message.body = { + "_object_id": object._object_id, + "_method_name" : method, + "_arguments" : arguments + }; + + correlator.add(correlationID); + messenger.put(message); + }; -var pumpData = function() { - if (!subscribed) { - var subscriptionAddress = subscription.getAddress(); - if (subscriptionAddress) { - subscribed = true; + this.addConnection = function(addr, callback) { + brokerAddress = addr + '/qmf.default.direct'; + var replyAddress = addr + '/#'; + + messenger.on('subscription', function(subscription) { + var subscriptionAddress = subscription.getAddress(); var splitAddress = subscriptionAddress.split('/'); replyTo = splitAddress[splitAddress.length - 1]; + callback(); + }); - onSubscription(); - } - } - - while (messenger.incoming()) { - // The second parameter forces Binary payloads to be decoded as strings - // this is useful because the broker QMF Agent encodes strings as AMQP - // binary, which is a right pain from an interoperability perspective. - var t = messenger.get(message, true); - correlator.resolve(); - messenger.accept(t); + messenger.subscribe(replyAddress); } - if (messenger.isStopped()) { - message.free(); - messenger.free(); + this.destroy = function() { + messenger.stop(); } -}; -var getObjects = function(packageName, className) { - message.setAddress(address); - message.setSubject('broker'); - message.setReplyTo(replyTo); - message.setCorrelationID(className); - message.properties = { - "routing-key": "broker", // Added for Java Broker - "x-amqp-0-10.app-id": "qmf2", - "method": "request", - "qmf.opcode": "_query_request", - }; - message.body = { - "_what": "OBJECT", - "_schema_id": { - "_package_name": packageName, - "_class_name": className - } - }; + this.request = function() {return correlator.request();} - correlator.add(className); - messenger.put(message); + messenger.on('error', function(error) {console.log(error);}); + messenger.on('work', pumpData); + messenger.setOutgoingWindow(1024); + messenger.recv(); // Receive as many messages as messenger can buffer. + messenger.start(); }; -var invokeMethod = function(object, method, arguments) { - var correlationID = 'method'; - message.setAddress(address); - message.setSubject('broker'); - message.setReplyTo(replyTo); - message.setCorrelationID(correlationID); - message.properties = { - "routing-key": "broker", // Added for Java Broker - "x-amqp-0-10.app-id": "qmf2", - "method": "request", - "qmf.opcode": "_method_request", - }; - message.body = { - "_object_id": object._object_id, - "_method_name" : method, - "_arguments" : arguments - }; - - correlator.add(correlationID); - messenger.put(message); -}; - -messenger.on('error', function(error) {console.log(error);}); -messenger.on('work', pumpData); -messenger.setOutgoingWindow(1024); -messenger.start(); - -subscription = messenger.subscribe('amqp://' + addr + '/#'); -messenger.recv(); // Receive as many messages as messenger can buffer. - /************************* qpid-config business logic ************************/ +var brokerAgent = new qmf.Console(); + var _usage = 'Usage: qpid-config [OPTIONS]\n' + ' qpid-config [OPTIONS] exchanges [filter-string]\n' + @@ -382,7 +390,7 @@ var getValue = function(r) { var config = { _recursive : false, - _host : 'localhost:5673', // Note 5673 not 5672 as we use WebSocket transport. + _host : 'guest:guest@localhost:5673', // Note 5673 not 5672 as we use WebSocket transport. _connTimeout : 10, _ignoreDefault : false, _altern_ex : null, @@ -539,10 +547,10 @@ var renderObject = function(obj, list) { */ var overview = function() { - correlator.request( + brokerAgent.request( // Send the QMF query requests for the specified classes. - getObjects('org.apache.qpid.broker', 'queue'), - getObjects('org.apache.qpid.broker', 'exchange') + brokerAgent.getObjects('org.apache.qpid.broker', 'queue'), + brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') ).then(function(objects) { var exchanges = objects.exchange; var queues = objects.queue; @@ -571,14 +579,14 @@ var overview = function() { } console.log(" durable: " + durable); console.log(" non-durable: " + (queues.length - durable)); - messenger.stop(); + brokerAgent.destroy(); }); }; var exchangeList = function(filter) { - correlator.request( + brokerAgent.request( // Send the QMF query requests for the specified classes. - getObjects('org.apache.qpid.broker', 'exchange') + brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') ).then(function(objects) { var exchanges = objects.exchange; var exMap = idMap(exchanges); @@ -630,16 +638,16 @@ var exchangeList = function(filter) { console.log(string); } } - messenger.stop(); + brokerAgent.destroy(); }); }; var exchangeListRecurse = function(filter) { - correlator.request( + brokerAgent.request( // Send the QMF query requests for the specified classes. - getObjects('org.apache.qpid.broker', 'queue'), - getObjects('org.apache.qpid.broker', 'exchange'), - getObjects('org.apache.qpid.broker', 'binding') + brokerAgent.getObjects('org.apache.qpid.broker', 'queue'), + brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'), + brokerAgent.getObjects('org.apache.qpid.broker', 'binding') ).then(function(objects) { var exchanges = objects.exchange; var bindings = objects.binding; @@ -666,15 +674,15 @@ var exchangeListRecurse = function(filte } } } - messenger.stop(); + brokerAgent.destroy(); }); }; var queueList = function(filter) { - correlator.request( + brokerAgent.request( // Send the QMF query requests for the specified classes. - getObjects('org.apache.qpid.broker', 'queue'), - getObjects('org.apache.qpid.broker', 'exchange') + brokerAgent.getObjects('org.apache.qpid.broker', 'queue'), + brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') ).then(function(objects) { var queues = objects.queue; var exMap = idMap(objects.exchange); @@ -763,16 +771,16 @@ var queueList = function(filter) { console.log(string); } } - messenger.stop(); + brokerAgent.destroy(); }); }; var queueListRecurse = function(filter) { - correlator.request( + brokerAgent.request( // Send the QMF query requests for the specified classes. - getObjects('org.apache.qpid.broker', 'queue'), - getObjects('org.apache.qpid.broker', 'exchange'), - getObjects('org.apache.qpid.broker', 'binding') + brokerAgent.getObjects('org.apache.qpid.broker', 'queue'), + brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'), + brokerAgent.getObjects('org.apache.qpid.broker', 'binding') ).then(function(objects) { var queues = objects.queue; var bindings = objects.binding; @@ -806,7 +814,7 @@ var queueListRecurse = function(filter) } } } - messenger.stop(); + brokerAgent.destroy(); }); }; @@ -836,7 +844,6 @@ var queueListRecurse = function(filter) */ var handleMethodResponse = function(response, dontStop) { -console.log("Method result"); if (response._arguments) { //console.log(response._arguments); } if (response._values) { @@ -845,7 +852,7 @@ console.log("Method result"); // Mostly we want to stop the Messenger Event loop and exit when a QMF method // returns, but sometimes we don't, the dontStop flag prevents this behaviour. if (!dontStop) { - messenger.stop(); + brokerAgent.destroy(); } } @@ -885,13 +892,13 @@ var addExchange = function(args) { declArgs[REPLICATE] = config._replicate; } - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( - invokeMethod(broker, 'create', { + brokerAgent.request( + brokerAgent.invokeMethod(broker, 'create', { "type": "exchange", "name": ename, "properties": declArgs, @@ -907,13 +914,13 @@ var delExchange = function(args) { var ename = args[0]; - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( - invokeMethod(broker, 'delete', { + brokerAgent.request( + brokerAgent.invokeMethod(broker, 'delete', { "type": "exchange", "name": ename}) ).then(handleMethodResponse); @@ -1009,13 +1016,13 @@ var addQueue = function(args) { // correlator object isn't as good as a real Promise and doesn't support // chaining of "then" calls, so where we have complex dependencies we still // get somewhat into "callback hell". TODO improve the correlator. - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( - invokeMethod(broker, 'create', { + brokerAgent.request( + brokerAgent.invokeMethod(broker, 'create', { "type": "queue", "name": qname, "properties": declArgs, @@ -1024,18 +1031,18 @@ var addQueue = function(args) { if (config._start_replica) { handleMethodResponse(response, true); // The second parameter prevents exiting. // TODO test this stuff! - correlator.request( - getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct + brokerAgent.request( + brokerAgent.getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct ).then(function(objects) { if (objects.habroker.length > 0) { var habroker = objects.habroker[0]; - correlator.request( - invokeMethod(habroker, 'replicate', { + brokerAgent.request( + brokerAgent.invokeMethod(habroker, 'replicate', { "broker": config._start_replica, "queue": qname}) ).then(handleMethodResponse); } else { - messenger.stop(); + brokerAgent.destroy(); } }); } else { @@ -1052,13 +1059,13 @@ var delQueue = function(args) { var qname = args[0]; - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( - invokeMethod(broker, 'delete', { + brokerAgent.request( + brokerAgent.invokeMethod(broker, 'delete', { "type": "queue", "name": qname, "options": {"if_empty": config._if_empty, @@ -1092,9 +1099,6 @@ var snarf_header_args = function(args) { }; var bind = function(args) { -console.log("bind"); -console.log(args); - if (args.length < 2) { usage(); } @@ -1107,10 +1111,10 @@ console.log(args); key = args[2]; } - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker'), - getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type. + brokerAgent.getObjects('org.apache.qpid.broker', 'broker'), + brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type. ).then(function(objects) { var exchanges = objects.exchange; @@ -1136,15 +1140,15 @@ console.log(args); } else if (etype === 'headers') { declArgs = snarf_header_args(Array.prototype.slice.apply(args, [3])); } -console.log(declArgs); +//console.log(declArgs); if (typeof declArgs !== 'object') { process.exit(1); } var broker = objects.broker[0]; - correlator.request( - invokeMethod(broker, 'create', { + brokerAgent.request( + brokerAgent.invokeMethod(broker, 'create', { "type": "binding", "name": ename + '/' + qname + '/' + key, "properties": declArgs, @@ -1177,9 +1181,6 @@ console.log(declArgs); }; var unbind = function(args) { -console.log("unbind"); -console.log(args); - if (args.length < 2) { usage(); } @@ -1192,13 +1193,13 @@ console.log(args); key = args[2]; } - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( - invokeMethod(broker, 'delete', { + brokerAgent.request( + brokerAgent.invokeMethod(broker, 'delete', { "type": "binding", "name": ename + '/' + qname + '/' + key}) ).then(handleMethodResponse); @@ -1216,14 +1217,14 @@ console.log(args); */ var createObject = function(type, name, args) { - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( + brokerAgent.request( // Create an object of the specified type. - invokeMethod(broker, 'create', { + brokerAgent.invokeMethod(broker, 'create', { "type": type, "name": name, "properties": args, @@ -1233,14 +1234,14 @@ var createObject = function(type, name, }; var deleteObject = function(type, name, args) { - correlator.request( + brokerAgent.request( // We invoke the CRUD methods on the broker object. - getObjects('org.apache.qpid.broker', 'broker') + brokerAgent.getObjects('org.apache.qpid.broker', 'broker') ).then(function(objects) { var broker = objects.broker[0]; - correlator.request( + brokerAgent.request( // Create an object of the specified type and name. - invokeMethod(broker, 'delete', { + brokerAgent.invokeMethod(broker, 'delete', { "type": type, "name": name, "options": args}) @@ -1252,8 +1253,8 @@ var deleteObject = function(type, name, * This is a "generic" mechanism for listing arbitrary Management Objects. */ var listObjects = function(type) { - correlator.request( - getObjects('org.apache.qpid.broker', type) + brokerAgent.request( + brokerAgent.getObjects('org.apache.qpid.broker', type) ).then(function(objects) { // The correlator passes an object containing responses for all of the // supplied requests so we index it by the supplied type to get our response. @@ -1325,23 +1326,23 @@ var listObjects = function(type) { console.log(string); } - messenger.stop(); + brokerAgent.destroy(); }); }; var reloadAcl = function() { - correlator.request( - getObjects('org.apache.qpid.acl', 'acl') + brokerAgent.request( + brokerAgent.getObjects('org.apache.qpid.acl', 'acl') ).then(function(objects) { if (objects.acl.length > 0) { var acl = objects.acl[0]; - correlator.request( + brokerAgent.request( // Create an object of the specified type. - invokeMethod(acl, 'reloadACLFile', {}) + brokerAgent.invokeMethod(acl, 'reloadACLFile', {}) ).then(handleMethodResponse); } else { console.log("Failed: No ACL Loaded in Broker"); - messenger.stop(); + brokerAgent.destroy(); } }); }; @@ -1502,9 +1503,6 @@ if (params.length > 0) { } //console.log(config._host); +brokerAgent.addConnection(config._host, command); -var onSubscription = function() { - command(); -}; - Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/recv.js Sat Sep 13 13:46:29 2014 @@ -19,11 +19,14 @@ * */ -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("recv.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + var address = "amqp://~0.0.0.0"; var message = new proton.Message(); var messenger = new proton.Messenger(); Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.html Sat Sep 13 13:46:29 2014 @@ -61,9 +61,9 @@ console.log("body = " + body); messenger.on('error', function(error) { console.log("Received error " + error); -message.free(); +// Error recovery seems to require a new Messenger instance. +messenger.stop(); messenger.free(); -message = new proton.Message(); messenger = new proton.Messenger(); messenger.start(); console.log("Restarted"); Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/send.js Sat Sep 13 13:46:29 2014 @@ -19,11 +19,14 @@ * */ -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("send.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + var address = "amqp://0.0.0.0"; var subject = "UK.WEATHER"; var msgtext = "Hello World!"; Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/server.js Sat Sep 13 13:46:29 2014 @@ -21,11 +21,14 @@ // Simple server for use with client.js illustrating request/response -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("server.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + var address = "amqp://~0.0.0.0"; var message = new proton.Message(); var reply = new proton.Message(); Modified: qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js (original) +++ qpid/proton/branches/fadams-javascript-binding/examples/messenger/javascript/spout.js Sat Sep 13 13:46:29 2014 @@ -19,11 +19,14 @@ * */ -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("spout.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + console.log("spout not implemented yet"); process.exit(0); Modified: qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js (original) +++ qpid/proton/branches/fadams-javascript-binding/proton-c/bindings/javascript/binding.js Sat Sep 13 13:46:29 2014 @@ -214,9 +214,17 @@ Module['Messenger'] = function(name) { / */ _pn_messenger_set_blocking(this._messenger, false); + // Subscriptions that haven't yet completed, used for managing subscribe events. + this._pendingSubscriptions = []; + // Used in the Event registration mechanism (in the 'on' and 'emit' methods). this._callbacks = {}; + // This call ensures that the emscripten network callback functions are initialised. + Module.EventDispatch.registerMessenger(this); + + + // TODO improve error handling mechanism. /* * The emscripten websocket error event could get triggered by any Messenger * and it's hard to determine which one without knowing which file descriptors @@ -233,10 +241,13 @@ Module['Messenger'] = function(name) { / */ var that = this; Module['websocket']['on']('error', function(error) { + +console.log("Module['websocket']['on'] caller is " + arguments.callee.caller.toString()); + console.log("that._checkErrors = " + that._checkErrors); console.log("error = " + error); if (that._checkErrors) { - that.emit('error', new Module['MessengerError'](error[2])); + that._emit('error', new Module['MessengerError'](error[2])); } }); }; @@ -269,7 +280,7 @@ _Messenger_._check = function(code) { var message = errno ? this['getError']() : Pointer_stringify(_pn_code(code)); if (this._callbacks['error']) { - this.emit('error', new Module['MessengerError'](message)); + this._emit('error', new Module['MessengerError'](message)); } else { throw new Module['MessengerError'](message); } @@ -279,18 +290,48 @@ _Messenger_._check = function(code) { }; /** - * Invokes a callback registered for a specified event. - * @method emit + * Invokes the callbacks registered for a specified event. + * @method _emit * @memberof! proton.Messenger# * @param event the event we want to emit. * @param param the parameter we'd like to pass to the event callback. */ -_Messenger_.emit = function(event, param) { - if ('function' === typeof this._callbacks[event]) { - this._callbacks[event].call(this, param); +_Messenger_._emit = function(event, param) { + var callbacks = this._callbacks[event]; + if (callbacks) { + for (var i = 0; i < callbacks.length; i++) { + var callback = callbacks[i]; + if ('function' === typeof callback) { + callback.call(this, param); + } + } + } +}; + +/** + * Checks any pending subscriptions and when a source address becomes available + * emit a subscription event passing the Subscription that triggered the event. + * Note that this doesn't seem to work for listen/bind style subscriptions, + * that is to say subscriptions of the form amqp://~0.0.0.0 don't know why? + */ +_Messenger_._checkSubscriptions = function() { + // Check for completed subscriptions, and emit subscribe event. + var subscriptions = this._pendingSubscriptions; + if (subscriptions.length) { + var pending = []; // Array of any subscriptions that remain pending. + for (var j = 0; j < subscriptions.length; j++) { + subscription = subscriptions[j]; + if (subscription['getAddress']()) { + this._emit('subscription', subscription); + } else { + pending.push(subscription); + } + } + this._pendingSubscriptions = pending; } }; + // *************************** Public methods ***************************** /** @@ -327,11 +368,11 @@ _Messenger_.emit = function(event, param */ _Messenger_['on'] = function(event, callback) { if ('function' === typeof callback) { - if (event === 'work') { - Module.EventDispatch.addListener(this, callback); - } else { - this._callbacks[event] = callback; + if (!this._callbacks[event]) { + this._callbacks[event] = []; } + + this._callbacks[event].push(callback); } }; @@ -340,15 +381,25 @@ _Messenger_['on'] = function(event, call * @method removeListener * @memberof! proton.Messenger# * @param event the event we want to detach from. - * @param callback the callback function to be remove for the specified event. + * @param callback the callback function to be removed for the specified event. + * if no callback is specified all callbacks are removed for the event. */ _Messenger_['removeListener'] = function(event, callback) { - if ('function' === typeof callback) { - if (event === 'work') { - Module.EventDispatch.removeListener(this, callback); - } else { - this._callbacks[event] = null;//callback; + if (callback) { + var callbacks = this._callbacks[event]; + if ('function' === typeof callback && callbacks) { + // Search for the specified callback. + for (var i = 0; i < callbacks.length; i++) { + if (callback === callbacks[i]) { + // If we find the specified callback delete it and return. + callbacks.splice(i, 1); + return; + } + } } + } else { + // If we call remove with no callback we remove all callbacks. + delete this._callbacks[event]; } }; @@ -392,6 +443,8 @@ _Messenger_['isBlocking'] = function() { * @memberof! proton.Messenger# */ _Messenger_['free'] = function() { + // This call ensures that the emscripten network callback functions are removed. + Module.EventDispatch.unregisterMessenger(this); _pn_messenger_free(this._messenger); }; @@ -477,14 +530,6 @@ _Messenger_['setIncomingWindow'] = funct */ _Messenger_['start'] = function() { this._check(_pn_messenger_start(this._messenger)); - - // This call ensures that the emscripten network callback functions are set - // up even if a client hasn't explicity added a work function via a call to - // messenger.on('work', <work function>); - // Doing this means that pn_messenger_work() will still get called when any - // WebSocket events occur, which keeps things more reliable when things like - // reconnections occur. - Module.EventDispatch.addListener(this); }; /** @@ -534,7 +579,7 @@ _Messenger_['subscribe'] = function(sour this._check(Module['Error']['ARG_ERR']); } var sp = Runtime.stackSave(); - this._checkErrors = true; + this._checkErrors = true; // TODO improve error handling mechanism. var subscription = _pn_messenger_subscribe(this._messenger, allocate(intArrayFromString(source), 'i8', ALLOC_STACK)); Runtime.stackRestore(sp); @@ -542,7 +587,10 @@ _Messenger_['subscribe'] = function(sour if (!subscription) { this._check(Module['Error']['ERR']); } - return new Subscription(subscription); + + subscription = new Subscription(subscription) + this._pendingSubscriptions.push(subscription); + return subscription; }; /** @@ -570,7 +618,7 @@ _Messenger_['subscribe'] = function(sour _Messenger_['put'] = function(message, flush) { flush = flush === false ? false : true; message._preEncode(); - this._checkErrors = true; + this._checkErrors = true; // TODO improve error handling mechanism. this._check(_pn_messenger_put(this._messenger, message._message)); // If flush is set invoke pn_messenger_work. @@ -660,7 +708,7 @@ _Messenger_['work'] = function() { console.log("work = false"); return false; } else { - this._checkErrors = false; + this._checkErrors = false; // TODO improve error handling mechanism. this._check(err); console.log("work = true"); return true; @@ -931,7 +979,7 @@ _Messenger_['rewrite'] = function(patter * @memberof proton */ Module.EventDispatch = new function() { // Note the use of new to create a Singleton. - var _firstCall = true; // Flag used to check the first time addListener is called. + var _firstCall = true; // Flag used to check the first time registerMessenger is called. var _messengers = {}; /** @@ -950,15 +998,15 @@ Module.EventDispatch = new function() { var _pump = function(fd, closing) { for (var i in _messengers) { if (_messengers.hasOwnProperty(i)) { - var current = _messengers[i]; + var messenger = _messengers[i]; if (closing) { - current.invokeCallbacks(); + messenger._emit('work'); } else { - var messenger = current.messenger; while (_pn_messenger_work(messenger._messenger, 0) >= 0) { - messenger._checkErrors = false; - current.invokeCallbacks(); + messenger._checkSubscriptions(); + messenger._checkErrors = false; // TODO improve error handling mechanism. + messenger._emit('work'); } } } @@ -974,64 +1022,33 @@ Module.EventDispatch = new function() { }; /** - * Initialises the emscripten network callback functions. This needs to be - * done the first time we call addListener rather that when we create the - * Singleton because emscripten's socket filesystem has to be mounted before - * we can register listeners for any of these events. + * Register the specified Messenger as being interested in network events. */ - var _init = function() { - Module['websocket']['on']('open', _pump); - Module['websocket']['on']('connection', _pump); - Module['websocket']['on']('message', _pump); - Module['websocket']['on']('close', _close); - }; - - /** - * Add a listener callback for the specified Messenger. Multiple listeners - * are permitted for each Messenger and listeners can be registered for - * multiple Messenger instances. The first time this method is called we - * initialise the emscripten network callback functions. - */ - this.addListener = function(messenger, callback) { + this.registerMessenger = function(messenger) { if (_firstCall) { - _init(); + /** + * Initialises the emscripten network callback functions. This needs + * to be done the first time we call registerMessenger rather than + * when we create the Singleton because emscripten's socket filesystem + * has to be mounted before can listen for any of these events. + */ + Module['websocket']['on']('open', _pump); + Module['websocket']['on']('connection', _pump); + Module['websocket']['on']('message', _pump); + Module['websocket']['on']('close', _close); _firstCall = false; } var name = messenger.getName(); - if (!_messengers[name]) { - _messengers[name] = { - messenger: messenger, - callbacks: [], - invokeCallbacks: function() { - for (var j = 0; j < this.callbacks.length; j++) { - this.callbacks[j](); - } - } - }; - } - - if (callback) { - _messengers[name].callbacks.push(callback); - } + _messengers[name] = messenger; }; /** - * Remove the specified listener callback from the specified Messenger. + * Unregister the specified Messenger from interest in network events. */ - this.removeListener = function(messenger, callback) { + this.unregisterMessenger = function(messenger) { var name = messenger.getName(); - if (_messengers[name]) { - // If we find the registered Messenger search for the specified callback. - var callbacks = _messengers[name].callbacks; - for (var j = 0; j < callbacks.length; j++) { - if (callback === callbacks[j]) { - // If we find the specified callback delete it and return. - callbacks.splice(j, 1); - return; - } - } - } + delete _messengers[name]; }; }; Modified: qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js (original) +++ qpid/proton/branches/fadams-javascript-binding/tests/javascript/codec.js Sat Sep 13 13:46:29 2014 @@ -23,13 +23,16 @@ * proton.Data wrapper class. */ -// Check if the environment is Node.js and if so import the required libraries. -if (typeof exports !== "undefined" && exports !== null) { - unittest = require("./unittest.js"); - assert = require("assert"); - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("codec.js should be run in Node.js"); + return; } +var unittest = require("./unittest.js"); +var assert = require("assert"); +var proton = require("qpid-proton"); + // Extend TestCase by creating a prototype instance and adding test methods as properties. var DataTest = new unittest.TestCase(); Modified: qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js (original) +++ qpid/proton/branches/fadams-javascript-binding/tests/javascript/message.js Sat Sep 13 13:46:29 2014 @@ -23,13 +23,16 @@ * proton.Message wrapper class. */ -// Check if the environment is Node.js and if so import the required libraries. -if (typeof exports !== "undefined" && exports !== null) { - unittest = require("./unittest.js"); - assert = require("assert"); - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("message.js should be run in Node.js"); + return; } +var unittest = require("./unittest.js"); +var assert = require("assert"); +var proton = require("qpid-proton"); + /** * JavaScript Implementation of Python's range() function taken from: * http://stackoverflow.com/questions/8273047/javascript-function-similar-to-python-range Modified: qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js URL: http://svn.apache.org/viewvc/qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js?rev=1624737&r1=1624736&r2=1624737&view=diff ============================================================================== --- qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js (original) +++ qpid/proton/branches/fadams-javascript-binding/tests/javascript/soak.js Sat Sep 13 13:46:29 2014 @@ -19,19 +19,20 @@ * */ -// Check if the environment is Node.js and if so import the required library. -if (typeof exports !== "undefined" && exports !== null) { - proton = require("qpid-proton"); +// Check if the environment is Node.js and if not log an error and exit. +if (!exports) { + console.error("soak.js should be run in Node.js"); + return; } +var proton = require("qpid-proton"); + var addr = 'guest:guest@localhost:5673'; //var addr = 'localhost:5673'; var address = 'amqp://' + addr; console.log(address); var subscriptionQueue = ''; -var subscription; -var subscribed = false; var count = 0; var start = 0; // Start Time. @@ -39,16 +40,6 @@ var message = new proton.Message(); var messenger = new proton.Messenger(); var pumpData = function() { - if (!subscribed) { - var subscriptionAddress = subscription.getAddress(); - if (subscriptionAddress) { - subscribed = true; - var splitAddress = subscriptionAddress.split('/'); - subscriptionQueue = splitAddress[splitAddress.length - 1]; - onSubscription(); - } - } - while (messenger.incoming()) { // The second parameter forces Binary payloads to be decoded as strings // this is useful because the broker QMF Agent encodes strings as AMQP @@ -84,16 +75,21 @@ var sendMessage = function() { messenger.on('error', function(error) {console.log(error);}); messenger.on('work', pumpData); +messenger.on('subscription', function(subscription) { + var subscriptionAddress = subscription.getAddress(); + var splitAddress = subscriptionAddress.split('/'); + subscriptionQueue = splitAddress[splitAddress.length - 1]; + + console.log("Subscription Queue: " + subscriptionQueue); + start = +new Date(); + sendMessage(); +}); + //messenger.setOutgoingWindow(1024); messenger.setIncomingWindow(1024); // The Java Broker seems to need this. +messenger.recv(); // Receive as many messages as messenger can buffer. messenger.start(); -subscription = messenger.subscribe('amqp://' + addr + '/#'); -messenger.recv(); // Receive as many messages as messenger can buffer. +messenger.subscribe('amqp://' + addr + '/#'); -var onSubscription = function() { - console.log("Subscription Queue: " + subscriptionQueue); - start = +new Date(); - sendMessage(); -}; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org