Fixed bug https://issues.apache.org/jira/browse/APLO-355
Project: http://git-wip-us.apache.org/repos/asf/activemq-apollo/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-apollo/commit/1e3eadff Tree: http://git-wip-us.apache.org/repos/asf/activemq-apollo/tree/1e3eadff Diff: http://git-wip-us.apache.org/repos/asf/activemq-apollo/diff/1e3eadff Branch: refs/heads/trunk Commit: 1e3eadff46fa4b3bb2ca2ae7ec905e831b83818c Parents: 35c3c79 Author: leiqin <leiqin2...@gmail.com> Authored: Mon Apr 28 16:29:26 2014 +0800 Committer: leiqin <leiqin2...@gmail.com> Committed: Mon Apr 28 16:29:26 2014 +0800 ---------------------------------------------------------------------- .../jetty/WebSocketTransportFactory.scala | 8 +- .../examples/stomp/websocket/js/stomp.js | 207 ++++++++++++++----- 2 files changed, 155 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/1e3eadff/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala ---------------------------------------------------------------------- diff --git a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala index d3fb727..df921f4 100644 --- a/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala +++ b/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/jetty/WebSocketTransportFactory.scala @@ -36,7 +36,7 @@ import java.nio.channels._ import scala.collection.mutable.ListBuffer import java.util.concurrent.{ExecutorService, Executor, ArrayBlockingQueue} import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState -import org.fusesource.hawtbuf.{AsciiBuffer, Buffer} +import org.fusesource.hawtbuf.{UTF8Buffer, AsciiBuffer, Buffer} import java.io.{EOFException, IOException} import java.security.cert.X509Certificate import org.apache.activemq.apollo.broker.web.AllowAnyOriginFilter @@ -366,7 +366,7 @@ object WebSocketTransportFactory extends TransportFactory.Provider with Log { first_message = false } // Convert string messages to bytes messages.. our codecs just work with bytes.. - var buffer = new AsciiBuffer(str) + var buffer = new UTF8Buffer(str) onMessage(buffer.data, buffer.offset, buffer.length) } @@ -597,7 +597,7 @@ object WebSocketTransportFactory extends TransportFactory.Provider with Log { if( service_state.is_starting_or_started || !write_failed) { try { if (!binary_transfers) { - connection.sendMessage(buffer.ascii().toString) + connection.sendMessage(buffer.utf8().toString) } else { connection.sendMessage(buffer.data, buffer.offset, buffer.length) } @@ -622,4 +622,4 @@ object WebSocketTransportFactory extends TransportFactory.Provider with Log { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/activemq-apollo/blob/1e3eadff/apollo-distro/src/main/release/examples/stomp/websocket/js/stomp.js ---------------------------------------------------------------------- diff --git a/apollo-distro/src/main/release/examples/stomp/websocket/js/stomp.js b/apollo-distro/src/main/release/examples/stomp/websocket/js/stomp.js index 3e237f1..9299345 100644 --- a/apollo-distro/src/main/release/examples/stomp/websocket/js/stomp.js +++ b/apollo-distro/src/main/release/examples/stomp/websocket/js/stomp.js @@ -1,16 +1,24 @@ -// Generated by CoffeeScript 1.3.3 +// Generated by CoffeeScript 1.6.3 +/* + Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0 + + Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/) + Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com) +*/ + + (function() { var Byte, Client, Frame, Stomp, - __hasProp = {}.hasOwnProperty; - - var MAX_FRAME_SIZE=16*1024;; - + __hasProp = {}.hasOwnProperty, + __slice = [].slice; + Byte = { LF: '\x0A', NULL: '\x00' }; Frame = (function() { + var unmarshallSingle; function Frame(command, headers, body) { this.command = command; @@ -28,14 +36,22 @@ lines.push("" + name + ":" + value); } if (this.body) { - lines.push("content-length:" + ('' + this.body).length); + lines.push("content-length:" + (Frame.sizeOfUTF8(this.body))); } lines.push(Byte.LF + this.body); return lines.join(Byte.LF); }; - Frame._unmarshallSingle = function(data) { - var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _ref, _ref1; + Frame.sizeOfUTF8 = function(s) { + if (s) { + return encodeURI(s).split(/%..|./).length - 1; + } else { + return 0; + } + }; + + unmarshallSingle = function(data) { + var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1; divider = data.search(RegExp("" + Byte.LF + Byte.LF)); headerLines = data.substring(0, divider).split(Byte.LF); command = headerLines.shift(); @@ -43,9 +59,9 @@ trim = function(str) { return str.replace(/^\s+|\s+$/g, ''); }; - line = idx = null; - for (i = _i = 0, _ref = headerLines.length; 0 <= _ref ? _i < _ref : _i > _ref; i = 0 <= _ref ? ++_i : --_i) { - line = headerLines[i]; + _ref = headerLines.reverse(); + for (_i = 0, _len = _ref.length; _i < _len; _i++) { + line = _ref[_i]; idx = line.indexOf(':'); headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1)); } @@ -76,7 +92,7 @@ for (_i = 0, _len = _ref.length; _i < _len; _i++) { data = _ref[_i]; if ((data != null ? data.length : void 0) > 0) { - _results.push(Frame._unmarshallSingle(data)); + _results.push(unmarshallSingle(data)); } } return _results; @@ -94,6 +110,7 @@ })(); Client = (function() { + var now; function Client(ws) { this.ws = ws; @@ -104,19 +121,36 @@ outgoing: 10000, incoming: 10000 }; + this.maxWebSocketFrameSize = 16 * 1024; this.subscriptions = {}; } + Client.prototype.debug = function(message) { + var _ref; + return typeof window !== "undefined" && window !== null ? (_ref = window.console) != null ? _ref.log(message) : void 0 : void 0; + }; + + now = function() { + if (Date.now) { + return Date.now(); + } else { + return new Date().valueOf; + } + }; + Client.prototype._transmit = function(command, headers, body) { var out; out = Frame.marshall(command, headers, body); if (typeof this.debug === "function") { this.debug(">>> " + out); } - while( true) { - if( out.length > MAX_FRAME_SIZE ) { - this.ws.send(out.substring(0, MAX_FRAME_SIZE)); - out = out.substring(MAX_FRAME_SIZE); + while (true) { + if (out.length > this.maxWebSocketFrameSize) { + this.ws.send(out.substring(0, this.maxWebSocketFrameSize)); + out = out.substring(this.maxWebSocketFrameSize); + if (typeof this.debug === "function") { + this.debug("remaining = " + out.length); + } } else { return this.ws.send(out); } @@ -144,37 +178,64 @@ if (typeof this.debug === "function") { this.debug("send PING every " + ttl + "ms"); } - this.pinger = typeof window !== "undefined" && window !== null ? window.setInterval(function() { + this.pinger = Stomp.setInterval(ttl, function() { _this.ws.send(Byte.LF); return typeof _this.debug === "function" ? _this.debug(">>> PING") : void 0; - }, ttl) : void 0; + }); } if (!(this.heartbeat.incoming === 0 || serverOutgoing === 0)) { ttl = Math.max(this.heartbeat.incoming, serverOutgoing); if (typeof this.debug === "function") { this.debug("check PONG every " + ttl + "ms"); } - return this.ponger = typeof window !== "undefined" && window !== null ? window.setInterval(function() { + return this.ponger = Stomp.setInterval(ttl, function() { var delta; - delta = Date.now() - _this.serverActivity; + delta = now() - _this.serverActivity; if (delta > ttl * 2) { if (typeof _this.debug === "function") { _this.debug("did not receive server activity for the last " + delta + "ms"); } - return _this._cleanUp(); + return _this.ws.close(); + } + }); + } + }; + + Client.prototype._parseConnect = function() { + var args, connectCallback, errorCallback, headers; + args = 1 <= arguments.length ? __slice.call(arguments, 0) : []; + headers = {}; + switch (args.length) { + case 2: + headers = args[0], connectCallback = args[1]; + break; + case 3: + if (args[1] instanceof Function) { + headers = args[0], connectCallback = args[1], errorCallback = args[2]; + } else { + headers.login = args[0], headers.passcode = args[1], connectCallback = args[2]; } - }, ttl) : void 0; + break; + case 4: + headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3]; + break; + default: + headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3], headers.host = args[4]; } + return [headers, connectCallback, errorCallback]; }; - Client.prototype.connect = function(login, passcode, connectCallback, errorCallback, vhost) { - var _this = this; - this.connectCallback = connectCallback; + Client.prototype.connect = function() { + var args, errorCallback, headers, out, + _this = this; + args = 1 <= arguments.length ? __slice.call(arguments, 0) : []; + out = this._parseConnect.apply(this, args); + headers = out[0], this.connectCallback = out[1], errorCallback = out[2]; if (typeof this.debug === "function") { this.debug("Opening Web Socket..."); } this.ws.onmessage = function(evt) { - var arr, c, data, frame, onreceive, _i, _len, _ref, _results; + var arr, c, client, data, frame, messageID, onreceive, subscription, _i, _len, _ref, _results; data = typeof ArrayBuffer !== 'undefined' && evt.data instanceof ArrayBuffer ? (arr = new Uint8Array(evt.data), typeof _this.debug === "function" ? _this.debug("--- got data length: " + arr.length) : void 0, ((function() { var _i, _len, _results; _results = []; @@ -184,7 +245,7 @@ } return _results; })()).join('')) : evt.data; - _this.serverActivity = Date.now(); + _this.serverActivity = now(); if (data === Byte.LF) { if (typeof _this.debug === "function") { _this.debug("<<< PONG"); @@ -208,8 +269,27 @@ _results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0); break; case "MESSAGE": - onreceive = _this.subscriptions[frame.headers.subscription]; - _results.push(typeof onreceive === "function" ? onreceive(frame) : void 0); + subscription = frame.headers.subscription; + onreceive = _this.subscriptions[subscription] || _this.onreceive; + if (onreceive) { + client = _this; + messageID = frame.headers["message-id"]; + frame.ack = function(headers) { + if (headers == null) { + headers = {}; + } + return client.ack(messageID, subscription, headers); + }; + frame.nack = function(headers) { + if (headers == null) { + headers = {}; + } + return client.nack(messageID, subscription, headers); + }; + _results.push(onreceive(frame)); + } else { + _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0); + } break; case "RECEIPT": _results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0); @@ -229,47 +309,37 @@ if (typeof _this.debug === "function") { _this.debug(msg); } + _this._cleanUp(); return typeof errorCallback === "function" ? errorCallback(msg) : void 0; }; return this.ws.onopen = function() { - var headers; if (typeof _this.debug === "function") { _this.debug('Web Socket Opened...'); } - headers = { - "accept-version": Stomp.VERSIONS.supportedVersions(), - "heart-beat": [_this.heartbeat.outgoing, _this.heartbeat.incoming].join(',') - }; - if (vhost) { - headers.host = vhost; - } - if (login) { - headers.login = login; - } - if (passcode) { - headers.passcode = passcode; - } + headers["accept-version"] = Stomp.VERSIONS.supportedVersions(); + headers["heart-beat"] = [_this.heartbeat.outgoing, _this.heartbeat.incoming].join(','); return _this._transmit("CONNECT", headers); }; }; - Client.prototype.disconnect = function(disconnectCallback) { - this._transmit("DISCONNECT"); + Client.prototype.disconnect = function(disconnectCallback, headers) { + if (headers == null) { + headers = {}; + } + this._transmit("DISCONNECT", headers); this.ws.onclose = null; + this.ws.close(); this._cleanUp(); return typeof disconnectCallback === "function" ? disconnectCallback() : void 0; }; Client.prototype._cleanUp = function() { - this.ws.close(); this.connected = false; if (this.pinger) { - if (typeof window !== "undefined" && window !== null) { - window.clearInterval(this.pinger); - } + Stomp.clearInterval(this.pinger); } if (this.ponger) { - return typeof window !== "undefined" && window !== null ? window.clearInterval(this.ponger) : void 0; + return Stomp.clearInterval(this.ponger); } }; @@ -285,6 +355,7 @@ }; Client.prototype.subscribe = function(destination, callback, headers) { + var client; if (headers == null) { headers = {}; } @@ -294,7 +365,13 @@ headers.destination = destination; this.subscriptions[headers.id] = callback; this._transmit("SUBSCRIBE", headers); - return headers.id; + client = this; + return { + id: headers.id, + unsubscribe: function() { + return client.unsubscribe(headers.id); + } + }; }; Client.prototype.unsubscribe = function(id) { @@ -305,9 +382,21 @@ }; Client.prototype.begin = function(transaction) { - return this._transmit("BEGIN", { - transaction: transaction + var client, txid; + txid = transaction || "tx-" + this.counter++; + this._transmit("BEGIN", { + transaction: txid }); + client = this; + return { + id: txid, + commit: function() { + return client.commit(txid); + }, + abort: function() { + return client.abort(txid); + } + }; }; Client.prototype.commit = function(transaction) { @@ -345,7 +434,6 @@ })(); Stomp = { - libVersion: "2.0.0-next", VERSIONS: { V1_0: '1.0', V1_1: '1.1', @@ -370,10 +458,17 @@ }; if (typeof window !== "undefined" && window !== null) { + Stomp.setInterval = function(interval, f) { + return window.setInterval(f, interval); + }; + Stomp.clearInterval = function(id) { + return window.clearInterval(id); + }; window.Stomp = Stomp; - } else { + } else if (typeof exports !== "undefined" && exports !== null) { exports.Stomp = Stomp; - Stomp.WebSocketClass = require('./test/server.mock.js').StompServerMock; + } else { + self.Stomp = Stomp; } }).call(this);