Repository: avro Updated Branches: refs/heads/master a50068f11 -> 9101a42ba
http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/test/test_protocols.js ---------------------------------------------------------------------- diff --git a/lang/js/test/test_protocols.js b/lang/js/test/test_protocols.js new file mode 100644 index 0000000..08ef784 --- /dev/null +++ b/lang/js/test/test_protocols.js @@ -0,0 +1,1392 @@ +/* jshint node: true, mocha: true */ + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +'use strict'; + +var protocols = require('../lib/protocols'), + utils = require('../lib/utils'), + assert = require('assert'), + stream = require('stream'), + util = require('util'); + + +var HANDSHAKE_REQUEST_TYPE = protocols.HANDSHAKE_REQUEST_TYPE; +var HANDSHAKE_RESPONSE_TYPE = protocols.HANDSHAKE_RESPONSE_TYPE; +var createProtocol = protocols.createProtocol; + + +suite('protocols', function () { + + suite('Protocol', function () { + + test('get name and types', function () { + var p = createProtocol({ + namespace: 'foo', + protocol: 'HelloWorld', + types: [ + { + name: 'Greeting', + type: 'record', + fields: [{name: 'message', type: 'string'}] + }, + { + name: 'Curse', + type: 'error', + fields: [{name: 'message', type: 'string'}] + } + ], + messages: { + hello: { + request: [{name: 'greeting', type: 'Greeting'}], + response: 'Greeting', + errors: ['Curse'] + }, + hi: { + request: [{name: 'hey', type: 'string'}], + response: 'null', + 'one-way': true + } + } + }); + assert.equal(p.getName(), 'foo.HelloWorld'); + assert.equal(p.getType('foo.Greeting').getName(true), 'record'); + }); + + test('missing message', function () { + var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'}); + assert.throws(function () { + ptcl.on('add', function () {}); + }, /unknown/); + }); + + test('missing name', function () { + assert.throws(function () { + createProtocol({namespace: 'com.acme', messages: {}}); + }); + }); + + test('missing type', function () { + assert.throws(function () { + createProtocol({ + namespace: 'com.acme', + protocol: 'HelloWorld', + messages: { + hello: { + request: [{name: 'greeting', type: 'Greeting'}], + response: 'Greeting' + } + } + }); + }); + }); + + test('get messages', function () { + var ptcl; + ptcl = createProtocol({protocol: 'Empty'}); + assert.deepEqual(ptcl.getMessages(), {}); + ptcl = createProtocol({ + protocol: 'Ping', + messages: { + ping: { + request: [], + response: 'string' + } + } + }); + var messages = ptcl.getMessages(); + assert.equal(Object.keys(messages).length, 1); + assert(messages.ping !== undefined); + }); + + test('create listener', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var transport = new stream.PassThrough(); + var ee = ptcl.createListener(transport, function (pending) { + assert.equal(pending, 0); + done(); + }); + ee.destroy(); + }); + + test('subprotocol', function () { + var ptcl = createProtocol({namespace: 'com.acme', protocol: 'Hello'}); + var subptcl = ptcl.subprotocol(); + assert.strictEqual(subptcl._emitterResolvers, ptcl._emitterResolvers); + assert.strictEqual(subptcl._listenerResolvers, ptcl._listenerResolvers); + }); + + test('invalid emitter', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + ptcl.emit('hi', {}, null, function (err) { + assert(/invalid emitter/.test(err.string)); + done(); + }); + }); + + test('inspect', function () { + var p = createProtocol({ + namespace: 'hello', + protocol: 'World', + }); + assert.equal(p.inspect(), '<Protocol "hello.World">'); + }); + + }); + + suite('Message', function () { + + var Message = protocols.Message; + + test('empty errors', function () { + var m = new Message('Hi', { + request: [{name: 'greeting', type: 'string'}], + response: 'int' + }); + assert.deepEqual(m.errorType.toString(), '["string"]'); + }); + + test('missing response', function () { + assert.throws(function () { + new Message('Hi', { + request: [{name: 'greeting', type: 'string'}] + }); + }); + }); + + test('invalid one-way', function () { + // Non-null response. + assert.throws(function () { + new Message('Hi', { + request: [{name: 'greeting', type: 'string'}], + response: 'string', + 'one-way': true + }); + }); + // Non-empty errors. + assert.throws(function () { + new Message('Hi', { + request: [{name: 'greeting', type: 'string'}], + response: 'null', + errors: ['int'], + 'one-way': true + }); + }); + }); + + }); + + suite('MessageDecoder', function () { + + var MessageDecoder = protocols.streams.MessageDecoder; + + test('ok', function (done) { + var parts = [ + new Buffer([0, 1]), + new Buffer([2]), + new Buffer([]), + new Buffer([3, 4, 5]), + new Buffer([]) + ]; + var messages = []; + var readable = createReadableStream(parts.map(frame), true); + var writable = createWritableStream(messages, true) + .on('finish', function () { + assert.deepEqual( + messages, + [new Buffer([0, 1, 2]), new Buffer([3, 4, 5])] + ); + done(); + }); + readable.pipe(new MessageDecoder()).pipe(writable); + }); + + test('trailing data', function (done) { + var parts = [ + new Buffer([0, 1]), + new Buffer([2]), + new Buffer([]), + new Buffer([3]) + ]; + var messages = []; + var readable = createReadableStream(parts.map(frame), true); + var writable = createWritableStream(messages, true); + readable + .pipe(new MessageDecoder()) + .on('error', function () { + assert.deepEqual(messages, [new Buffer([0, 1, 2])]); + done(); + }) + .pipe(writable); + }); + + test('empty', function (done) { + var readable = createReadableStream([], true); + readable + .pipe(new MessageDecoder(true)) + .on('error', function () { done(); }); + }); + + }); + + suite('MessageEncoder', function () { + + var MessageEncoder = protocols.streams.MessageEncoder; + + test('invalid frame size', function () { + assert.throws(function () { new MessageEncoder(); }); + }); + + test('ok', function (done) { + var messages = [ + new Buffer([0, 1]), + new Buffer([2]) + ]; + var frames = []; + var readable = createReadableStream(messages, true); + var writable = createWritableStream(frames, true); + readable + .pipe(new MessageEncoder(64)) + .pipe(writable) + .on('finish', function () { + assert.deepEqual( + frames, + [ + new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 0]), + new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0]) + ] + ); + done(); + }); + }); + + test('all zeros', function (done) { + var messages = [new Buffer([0, 0, 0, 0])]; + var frames = []; + var readable = createReadableStream(messages, true); + var writable = createWritableStream(frames, true); + readable + .pipe(new MessageEncoder(64)) + .pipe(writable) + .on('finish', function () { + assert.deepEqual( + frames, + [new Buffer([0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0])] + ); + done(); + }); + }); + + test('short frame size', function (done) { + var messages = [ + new Buffer([0, 1, 2]), + new Buffer([2]) + ]; + var frames = []; + var readable = createReadableStream(messages, true); + var writable = createWritableStream(frames, true); + readable + .pipe(new MessageEncoder(2)) + .pipe(writable) + .on('finish', function () { + assert.deepEqual( + frames, + [ + new Buffer([0, 0, 0, 2, 0, 1, 0, 0, 0, 1, 2, 0, 0, 0, 0]), + new Buffer([0, 0, 0, 1, 2, 0, 0, 0, 0]) + ] + ); + done(); + }); + }); + + }); + + suite('StatefulEmitter', function () { + + test('ok handshake', function (done) { + var buf = HANDSHAKE_RESPONSE_TYPE.toBuffer({match: 'BOTH'}); + var bufs = []; + var ptcl = createProtocol({protocol: 'Empty'}); + var handshake = false; + ptcl.createEmitter(createTransport([buf], bufs)) + .on('handshake', function (req, res) { + handshake = true; + assert(res.match === 'BOTH'); + assert.deepEqual( + Buffer.concat(bufs), + HANDSHAKE_REQUEST_TYPE.toBuffer({ + clientHash: new Buffer(ptcl._hashString, 'binary'), + serverHash: new Buffer(ptcl._hashString, 'binary') + }) + ); + this.destroy(); + }) + .on('eot', function () { + assert(handshake); + done(); + }); + }); + + test('no server match handshake', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var resBufs = [ + { + match: 'NONE', + serverHash: {'org.apache.avro.ipc.MD5': new Buffer(16)}, + serverProtocol: {string: ptcl.toString()}, + }, + {match: 'BOTH'} + ].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); }); + var reqBufs = []; + var handshakes = 0; + ptcl.createEmitter(createTransport(resBufs, reqBufs)) + .on('handshake', function (req, res) { + if (handshakes++) { + assert(res.match === 'BOTH'); + this.destroy(); + } else { + assert(res.match === 'NONE'); + } + }) + .on('eot', function () { + assert.equal(handshakes, 2); + done(); + }); + }); + + test('incompatible protocol', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var hash = new Buffer(16); // Pretend the hash was different. + var resBufs = [ + { + match: 'NONE', + serverHash: {'org.apache.avro.ipc.MD5': hash}, + serverProtocol: {string: ptcl.toString()}, + }, + { + match: 'NONE', + serverHash: {'org.apache.avro.ipc.MD5': hash}, + serverProtocol: {string: ptcl.toString()}, + meta: {map: {error: new Buffer('abcd')}} + } + ].map(function (val) { return HANDSHAKE_RESPONSE_TYPE.toBuffer(val); }); + var error = false; + ptcl.createEmitter(createTransport(resBufs, [])) + .on('error', function (err) { + error = true; + assert.equal(err.message, 'abcd'); + }) + .on('eot', function () { + assert(error); + done(); + }); + }); + + test('handshake error', function (done) { + var resBufs = [ + new Buffer([4, 0, 0]), // Invalid handshakes. + new Buffer([4, 0, 0]) + ]; + var ptcl = createProtocol({protocol: 'Empty'}); + var error = false; + ptcl.createEmitter(createTransport(resBufs, [])) + .on('error', function (err) { + error = true; + assert.equal(err.message, 'handshake error'); + }) + .on('eot', function () { + assert(error); + done(); + }); + }); + + test('orphan response', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var idType = protocols.IdType.createMetadataType(); + var resBufs = [ + new Buffer([0, 0, 0]), // OK handshake. + idType.toBuffer(23) + ]; + var error = false; + ptcl.createEmitter(createTransport(resBufs, [])) + .on('error', function (err) { + error = true; + assert(/orphan response:/.test(err.message)); + }) + .on('eot', function () { + assert(error); + done(); + }); + }); + + test('ended readable', function (done) { + var bufs = []; + var ptcl = createProtocol({protocol: 'Empty'}); + ptcl.createEmitter(createTransport([], bufs)) + .on('eot', function () { + assert.equal(bufs.length, 1); // A single handshake was sent. + done(); + }); + }); + + test('interrupted', function (done) { + var ptcl = createProtocol({ + protocol: 'Empty', + messages: { + id: {request: [{name: 'id', type: 'int'}], response: 'int'} + } + }); + var resBufs = [ + new Buffer([0, 0, 0]), // OK handshake. + ]; + var interrupted = 0; + var transport = createTransport(resBufs, []); + var ee = ptcl.createEmitter(transport, function () { + assert.equal(interrupted, 2); + done(); + }); + + ptcl.emit('id', {id: 123}, ee, cb); + ptcl.emit('id', {id: 123}, ee, cb); + + function cb(err) { + assert.deepEqual(err, {string: 'interrupted'}); + interrupted++; + } + }); + + test('missing client message', function (done) { + var ptcl1 = createProtocol({ + protocol: 'Ping', + messages: { + ping: {request: [], response: 'string'} + } + }); + var ptcl2 = createProtocol({ + protocol: 'Ping', + messages: { + ping: {request: [], response: 'string'}, + pong: {request: [], response: 'string'} + } + }).on('ping', function (req, ee, cb) { cb(null, 'ok'); }); + var transports = createPassthroughTransports(); + ptcl2.createListener(transports[1]); + var ee = ptcl1.createEmitter(transports[0]); + ptcl1.emit('ping', {}, ee, function (err, res) { + assert.equal(res, 'ok'); + done(); + }); + }); + + test('missing server message', function (done) { + var ptcl1 = createProtocol({ + protocol: 'Ping', + messages: { + ping: {request: [], response: 'string'} + } + }); + var ptcl2 = createProtocol({protocol: 'Empty'}); + var transports = createPassthroughTransports(); + ptcl2.createListener(transports[1]); + ptcl1.createEmitter(transports[0]) + .on('error', function (err) { + assert(/missing server message: ping/.test(err.message)); + done(); + }); + }); + + test('trailing data', function (done) { + var ptcl = createProtocol({ + protocol: 'Ping', + messages: { + ping: {request: [], response: 'string'} + } + }); + var transports = createPassthroughTransports(); + ptcl.createEmitter(transports[0]) + .on('error', function (err) { + assert(/trailing data/.test(err.message)); + done(); + }); + transports[0].readable.end(new Buffer([2, 3])); + }); + + test('invalid metadata', function (done) { + var ptcl = createProtocol({ + protocol: 'Ping', + messages: { + ping: {request: [], response: 'string'} + } + }); + var transports = createPassthroughTransports(); + ptcl.createListener(transports[1]); + ptcl.createEmitter(transports[0]) + .on('error', function (err) { + assert(/invalid metadata:/.test(err.message)); + done(); + }) + .on('handshake', function () { + transports[0].readable.write(frame(new Buffer([2, 3]))); + transports[0].readable.write(frame(new Buffer(0))); + }); + }); + + test('invalid response', function (done) { + var ptcl = createProtocol({ + protocol: 'Ping', + messages: { + ping: {request: [], response: 'string'} + } + }); + var transports = createPassthroughTransports(); + var ml = ptcl.createListener(transports[1]); + var me = ptcl.createEmitter(transports[0]) + .on('handshake', function () { + ml.destroy(); + + ptcl.emit('ping', {}, me, function (err) { + assert(/invalid response:/.test(err.string)); + done(); + }); + + var idType = protocols.IdType.createMetadataType(); + var bufs = [ + idType.toBuffer(1), // Metadata. + new Buffer([3]) // Invalid response. + ]; + transports[0].readable.write(frame(Buffer.concat(bufs))); + transports[0].readable.write(frame(new Buffer(0))); + }); + }); + + test('one way', function (done) { + var beats = 0; + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: { + beat: {request: [], response: 'null', 'one-way': true} + } + }).on('beat', function (req, ee, cb) { + assert.strictEqual(cb, undefined); + if (++beats === 2) { + done(); + } + }); + var transports = createPassthroughTransports(); + ptcl.createListener(transports[1]); + var ee = ptcl.createEmitter(transports[0]); + ptcl.emit('beat', {}, ee); + ptcl.emit('beat', {}, ee); + }); + + }); + + suite('StatelessEmitter', function () { + + test('interrupted before response data', function (done) { + var ptcl = createProtocol({ + protocol: 'Ping', + messages: {ping: {request: [], response: 'boolean'}} + }); + var readable = stream.PassThrough() + .on('end', done); + var writable = createWritableStream([]); + var ee = ptcl.createEmitter(function (cb) { + cb(readable); + return writable; + }); + ptcl.emit('ping', {}, ee, function (err) { + assert(/interrupted/.test(err.string)); + readable.write(frame(new Buffer(2))); + readable.end(frame(new Buffer(0))); + }); + ee.destroy(true); + }); + + }); + + suite('StatefulListener', function () { + + test('end readable', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var transports = createPassthroughTransports(); + ptcl.createListener(transports[0]) + .on('eot', function (pending) { + assert.equal(pending, 0); + done(); + }); + transports[0].readable.end(); + }); + + test('finish writable', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var transports = createPassthroughTransports(); + ptcl.createListener(transports[0]) + .on('eot', function (pending) { + assert.equal(pending, 0); + done(); + }); + transports[0].writable.end(); + }); + + test('invalid handshake', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + var transport = createTransport( + [new Buffer([4])], // Invalid handshake. + [] + ); + ptcl.createListener(transport) + .on('handshake', function (req, res) { + assert(!req.$isValid()); + assert.equal(res.match, 'NONE'); + done(); + }); + }); + + test('missing server message', function (done) { + var ptcl1 = createProtocol({protocol: 'Empty'}); + var ptcl2 = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: {request: [], response: 'boolean'}} + }); + var hash = new Buffer(ptcl2._hashString, 'binary'); + var req = { + clientHash: hash, + clientProtocol: {string: ptcl2.toString()}, + serverHash: hash + }; + var transport = createTransport( + [HANDSHAKE_REQUEST_TYPE.toBuffer(req)], + [] + ); + ptcl1.createListener(transport) + .on('handshake', function (req, res) { + assert(req.$isValid()); + assert.equal(res.match, 'NONE'); + var msg = res.meta.map.error.toString(); + assert(/missing server message/.test(msg)); + done(); + }); + }); + + test('invalid metadata', function (done) { + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: {request: [], response: 'boolean'}} + }); + var transports = createPassthroughTransports(); + ptcl.createListener(transports[1]) + .on('error', function (err) { + assert(/invalid metadata/.test(err.message)); + done(); + }); + ptcl.createEmitter(transports[0]) + .on('handshake', function () { + // Handshake is complete now. + var writable = transports[0].writable; + writable.write(frame(new Buffer([0]))); // Empty metadata. + writable.write(frame(new Buffer(0))); + }); + }); + + test('unknown message', function (done) { + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: {request: [], response: 'boolean'}} + }); + var transports = createPassthroughTransports(); + var ee = ptcl.createListener(transports[1]) + .on('eot', function () { + transports[1].writable.end(); + }); + ptcl.createEmitter(transports[0]) + .on('handshake', function () { + // Handshake is complete now. + this.destroy(); + var idType = ee._idType; + var bufs = []; + transports[0].readable + .pipe(new protocols.streams.MessageDecoder()) + .on('data', function (buf) { bufs.push(buf); }) + .on('end', function () { + assert.equal(bufs.length, 1); + var tap = new utils.Tap(bufs[0]); + idType._read(tap); + assert(tap.buf[tap.pos++]); // Error byte. + tap.pos++; // Union marker. + assert(/unknown message/.test(tap.readString())); + done(); + }); + [ + idType.toBuffer(-1), + new Buffer([4, 104, 105]), // `hi` message. + new Buffer(0) // End of frame. + ].forEach(function (buf) { + transports[0].writable.write(frame(buf)); + }); + transports[0].writable.end(); + }); + }); + + test('invalid request', function (done) { + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: { + request: [{name: 'id', type: 'string'}], + response: 'boolean' + }} + }); + var transports = createPassthroughTransports(); + var ee = ptcl.createListener(transports[1]) + .on('eot', function () { transports[1].writable.end(); }); + ptcl.createEmitter(transports[0]) + .on('handshake', function () { + // Handshake is complete now. + this.destroy(); + var idType = ee._idType; + var bufs = []; + transports[0].readable + .pipe(new protocols.streams.MessageDecoder()) + .on('data', function (buf) { bufs.push(buf); }) + .on('end', function () { + assert.equal(bufs.length, 1); + var tap = new utils.Tap(bufs[0]); + idType._read(tap); + assert.equal(tap.buf[tap.pos++], 1); // Error byte. + assert.equal(tap.buf[tap.pos++], 0); // Union marker. + assert(/invalid request/.test(tap.readString())); + done(); + }); + [ + idType.toBuffer(-1), + new Buffer([8, 98, 101, 97, 116]), // `beat` message. + new Buffer([8]), // Invalid Avro string encoding. + new Buffer(0) // End of frame. + ].forEach(function (buf) { + transports[0].writable.write(frame(buf)); + }); + transports[0].writable.end(); + }); + }); + + test('destroy', function (done) { + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: {request: [], response: 'boolean'}} + }).on('beat', function (req, ee, cb) { + ee.destroy(); + setTimeout(function () { cb(null, true); }, 10); + }); + var transports = createPassthroughTransports(); + var responded = false; + ptcl.createListener(transports[1]) + .on('eot', function () { + assert(responded); // Works because the transport is sync. + done(); + }); + ptcl.emit('beat', {}, ptcl.createEmitter(transports[0]), function () { + responded = true; + }); + }); + + }); + + suite('StatelessListener', function () { + + test('unknown message', function (done) { + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: {request: [], response: 'boolean'}} + }); + var readable = new stream.PassThrough(); + var writable = new stream.PassThrough(); + var ee = ptcl.createListener(function (cb) { + cb(writable); + return readable; + }); + var bufs = []; + writable.pipe(new protocols.streams.MessageDecoder()) + .on('data', function (buf) { bufs.push(buf); }) + .on('end', function () { + assert.equal(bufs.length, 1); + var tap = new utils.Tap(bufs[0]); + tap.pos = 4; // Skip handshake response. + ee._idType._read(tap); // Skip metadata. + assert.equal(tap.buf[tap.pos++], 1); // Error. + assert.equal(tap.buf[tap.pos++], 0); // Union flag. + assert(/unknown message/.test(tap.readString())); + done(); + }); + var hash = new Buffer(ptcl._hashString, 'binary'); + var req = { + clientHash: hash, + clientProtocol: null, + serverHash: hash + }; + var encoder = new protocols.streams.MessageEncoder(64); + encoder.pipe(readable); + encoder.end(Buffer.concat([ + HANDSHAKE_REQUEST_TYPE.toBuffer(req), + new Buffer([0]), // Empty metadata. + new Buffer([4, 104, 105]) // `id` message. + ])); + }); + + test('late writable', function (done) { + var ptcl = createProtocol({ + protocol: 'Heartbeat', + messages: {beat: {request: [], response: 'boolean'}} + }).on('beat', function (req, ee, cb) { + cb(null, true); + }); + var readable = new stream.PassThrough(); + var writable = new stream.PassThrough(); + ptcl.createListener(function (cb) { + setTimeout(function () { cb(readable); }, 10); + return writable; + }); + var ee = ptcl.createEmitter(function (cb) { + cb(readable); + return writable; + }); + ptcl.emit('beat', {}, ee, function (err, res) { + assert.strictEqual(err, null); + assert.equal(res, true); + done(); + }); + }); + + }); + + suite('emit', function () { + + suite('stateful', function () { + + run(function (emitterPtcl, listenerPtcl, cb) { + var pt1 = new stream.PassThrough(); + var pt2 = new stream.PassThrough(); + var opts = {bufferSize: 48}; + cb( + emitterPtcl.createEmitter({readable: pt1, writable: pt2}, opts), + listenerPtcl.createListener({readable: pt2, writable: pt1}, opts) + ); + }); + + }); + + suite('stateless', function () { + + run(function (emitterPtcl, listenerPtcl, cb) { + cb(emitterPtcl.createEmitter(writableFactory)); + + function writableFactory(emitterCb) { + var reqPt = new stream.PassThrough() + .on('finish', function () { + listenerPtcl.createListener(function (listenerCb) { + var resPt = new stream.PassThrough() + .on('finish', function () { emitterCb(resPt); }); + listenerCb(resPt); + return reqPt; + }); + }); + return reqPt; + } + }); + + }); + + function run(setupFn) { + + test('single', function (done) { + var ptcl = createProtocol({ + protocol: 'Math', + messages: { + negate: { + request: [{name: 'n', type: 'int'}], + response: 'int' + } + } + }); + setupFn(ptcl, ptcl, function (ee) { + ee.on('eot', function () { done(); }); + ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); }); + ptcl.emit('negate', {n: 20}, ee, function (err, res) { + assert.equal(this, ptcl); + assert.strictEqual(err, null); + assert.equal(res, -20); + this.emit('negate', {n: 'hi'}, ee, function (err) { + assert(/invalid "int"/.test(err.string)); + ee.destroy(); + }); + }); + }); + }); + + test('invalid request', function (done) { + var ptcl = createProtocol({ + protocol: 'Math', + messages: { + negate: { + request: [{name: 'n', type: 'int'}], + response: 'int' + } + } + }).on('negate', function () { assert(false); }); + setupFn(ptcl, ptcl, function (ee) { + ee.on('eot', function () { done(); }); + ptcl.emit('negate', {n: 'a'}, ee, function (err) { + assert(/invalid "int"/.test(err.string), null); + ee.destroy(); + }); + }); + }); + + test('error response', function (done) { + var msg = 'must be non-negative'; + var ptcl = createProtocol({ + protocol: 'Math', + messages: { + sqrt: { + request: [{name: 'n', type: 'float'}], + response: 'float' + } + } + }).on('sqrt', function (req, ee, cb) { + var n = req.n; + if (n < 0) { + cb({string: msg}); + } else { + cb(null, Math.sqrt(n)); + } + }); + setupFn(ptcl, ptcl, function (ee) { + ptcl.emit('sqrt', {n: 100}, ee, function (err, res) { + assert(Math.abs(res - 10) < 1e-5); + ptcl.emit('sqrt', {n: - 10}, ee, function (err) { + assert.equal(this, ptcl); + assert.equal(err.string, msg); + done(); + }); + }); + }); + }); + + test('invalid response', function (done) { + var ptcl = createProtocol({ + protocol: 'Math', + messages: { + sqrt: { + request: [{name: 'n', type: 'float'}], + response: 'float' + } + } + }).on('sqrt', function (req, ee, cb) { + var n = req.n; + if (n < 0) { + cb(null, 'complex'); // Invalid response. + } else { + cb(null, Math.sqrt(n)); + } + }); + setupFn(ptcl, ptcl, function (ee) { + ptcl.emit('sqrt', {n: - 10}, ee, function (err) { + // The server error message is propagated to the client. + assert(/invalid "float"/.test(err.string)); + ptcl.emit('sqrt', {n: 100}, ee, function (err, res) { + // And the server doesn't die (we can make a new request). + assert(Math.abs(res - 10) < 1e-5); + done(); + }); + }); + }); + }); + + test('invalid error', function (done) { + var ptcl = createProtocol({ + protocol: 'Math', + messages: { + sqrt: { + request: [{name: 'n', type: 'float'}], + response: 'float' + } + } + }).on('sqrt', function (req, ee, cb) { + var n = req.n; + if (n < 0) { + cb({error: 'complex'}); // Invalid error. + } else { + cb(null, Math.sqrt(n)); + } + }); + setupFn(ptcl, ptcl, function (ee) { + ptcl.emit('sqrt', {n: - 10}, ee, function (err) { + assert(/invalid \["string"\]/.test(err.string)); + ptcl.emit('sqrt', {n: 100}, ee, function (err, res) { + // The server still doesn't die (we can make a new request). + assert(Math.abs(res - 10) < 1e-5); + done(); + }); + }); + }); + }); + + test('out of order', function (done) { + var ptcl = createProtocol({ + protocol: 'Delay', + messages: { + wait: { + request: [ + {name: 'ms', type: 'float'}, + {name: 'id', type: 'string'} + ], + response: 'string' + } + } + }).on('wait', function (req, ee, cb) { + var delay = req.ms; + if (delay < 0) { + cb(new Error('delay must be non-negative')); + return; + } + setTimeout(function () { cb(null, req.id); }, delay); + }); + var ids = []; + setupFn(ptcl, ptcl, function (ee) { + ee.on('eot', function (pending) { + assert.equal(pending, 0); + assert.deepEqual(ids, [null, 'b', 'a']); + done(); + }); + ptcl.emit('wait', {ms: 100, id: 'a'}, ee, function (err, res) { + assert.strictEqual(err, null); + ids.push(res); + }); + ptcl.emit('wait', {ms: 10, id: 'b'}, ee, function (err, res) { + assert.strictEqual(err, null); + ids.push(res); + ee.destroy(); + }); + ptcl.emit('wait', {ms: -100, id: 'c'}, ee, function (err, res) { + assert(/non-negative/.test(err.string)); + ids.push(res); + }); + }); + }); + + test('compatible protocols', function (done) { + var emitterPtcl = createProtocol({ + protocol: 'emitterProtocol', + messages: { + age: { + request: [{name: 'name', type: 'string'}], + response: 'long' + } + } + }); + var listenerPtcl = createProtocol({ + protocol: 'serverProtocol', + messages: { + age: { + request: [ + {name: 'name', type: 'string'}, + {name: 'address', type: ['null', 'string'], 'default': null} + ], + response: 'int' + }, + id: { + request: [{name: 'name', type: 'string'}], + response: 'long' + } + } + }); + setupFn( + emitterPtcl, + listenerPtcl, + function (ee) { + listenerPtcl.on('age', function (req, ee, cb) { + assert.equal(req.name, 'Ann'); + cb(null, 23); + }); + emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err, res) { + assert.strictEqual(err, null); + assert.equal(res, 23); + done(); + }); + } + ); + }); + + test('cached compatible protocols', function (done) { + var ptcl1 = createProtocol({ + protocol: 'emitterProtocol', + messages: { + age: { + request: [{name: 'name', type: 'string'}], + response: 'long' + } + } + }); + var ptcl2 = createProtocol({ + protocol: 'serverProtocol', + messages: { + age: { + request: [ + {name: 'name', type: 'string'}, + {name: 'address', type: ['null', 'string'], 'default': null} + ], + response: 'int' + }, + id: { + request: [{name: 'name', type: 'string'}], + response: 'long' + } + } + }).on('age', function (req, ee, cb) { cb(null, 48); }); + setupFn( + ptcl1, + ptcl2, + function (ee1) { + ptcl1.emit('age', {name: 'Ann'}, ee1, function (err, res) { + assert.equal(res, 48); + setupFn( + ptcl1, + ptcl2, + function (ee2) { // ee2 has the server's protocol. + ptcl1.emit('age', {name: 'Bob'}, ee2, function (err, res) { + assert.equal(res, 48); + done(); + }); + } + ); + }); + } + ); + }); + + test('incompatible protocols', function (done) { + var emitterPtcl = createProtocol({ + protocol: 'emitterProtocol', + messages: { + age: {request: [{name: 'name', type: 'string'}], response: 'long'} + } + }); + var listenerPtcl = createProtocol({ + protocol: 'serverProtocol', + messages: { + age: {request: [{name: 'name', type: 'int'}], response: 'long'} + } + }).on('age', function (req, ee, cb) { cb(null, 0); }); + setupFn( + emitterPtcl, + listenerPtcl, + function (ee) { + ee.on('error', function () {}); // For stateful protocols. + emitterPtcl.emit('age', {name: 'Ann'}, ee, function (err) { + assert(err); + done(); + }); + } + ); + }); + + test('unknown message', function (done) { + var ptcl = createProtocol({protocol: 'Empty'}); + setupFn(ptcl, ptcl, function (ee) { + ptcl.emit('echo', {}, ee, function (err) { + assert(/unknown/.test(err.string)); + done(); + }); + }); + }); + + test('unsupported message', function (done) { + var ptcl = createProtocol({ + protocol: 'Echo', + messages: { + echo: { + request: [{name: 'id', type: 'string'}], + response: 'string' + } + } + }); + setupFn(ptcl, ptcl, function (ee) { + ptcl.emit('echo', {id: ''}, ee, function (err) { + assert(/unsupported/.test(err.string)); + done(); + }); + }); + }); + + test('destroy emitter noWait', function (done) { + var ptcl = createProtocol({ + protocol: 'Delay', + messages: { + wait: { + request: [{name: 'ms', type: 'int'}], + response: 'string' + } + } + }).on('wait', function (req, ee, cb) { + setTimeout(function () { cb(null, 'ok'); }, req.ms); + }); + var interrupted = 0; + var eoted = false; + setupFn(ptcl, ptcl, function (ee) { + ee.on('eot', function (pending) { + eoted = true; + assert.equal(interrupted, 2); + assert.equal(pending, 2); + done(); + }); + ptcl.emit('wait', {ms: 75}, ee, interruptedCb); + ptcl.emit('wait', {ms: 50}, ee, interruptedCb); + ptcl.emit('wait', {ms: 10}, ee, function (err, res) { + assert.equal(res, 'ok'); + ee.destroy(true); + }); + + function interruptedCb(err) { + assert(/interrupted/.test(err.string)); + interrupted++; + } + }); + }); + + test('destroy emitter', function (done) { + var ptcl = createProtocol({ + protocol: 'Math', + messages: { + negate: { + request: [{name: 'n', type: 'int'}], + response: 'int' + } + } + }); + setupFn(ptcl, ptcl, function (ee) { + ptcl.on('negate', function (req, ee, cb) { cb(null, -req.n); }); + ptcl.emit('negate', {n: 20}, ee, function (err, res) { + assert.strictEqual(err, null); + assert.equal(res, -20); + ee.destroy(); + this.emit('negate', {n: 'hi'}, ee, function (err) { + assert(/destroyed/.test(err.string)); + done(); + }); + }); + }); + }); + + } + + }); + + test('throw error', function () { + assert(!tryCatch(null)); + assert.equal(tryCatch(new Error('hi')), 'hi'); + assert.equal(tryCatch('hi'), 'hi'); + assert.equal(tryCatch({string: 'hi'}), 'hi'); + + function tryCatch(err) { + try { + protocols.throwError(err); + } catch (err_) { + return err_.message; + } + } + }); + +}); + +// Helpers. + +// Message framing. +function frame(buf) { + var framed = new Buffer(buf.length + 4); + framed.writeInt32BE(buf.length); + buf.copy(framed, 4); + return framed; +} + +function createReadableTransport(bufs, frameSize) { + return createReadableStream(bufs) + .pipe(new protocols.streams.MessageEncoder(frameSize || 64)); +} + +function createWritableTransport(bufs) { + var decoder = new protocols.streams.MessageDecoder(); + decoder.pipe(createWritableStream(bufs)); + return decoder; +} + +function createTransport(readBufs, writeBufs) { + return toDuplex( + createReadableTransport(readBufs), + createWritableTransport(writeBufs) + ); +} + +function createPassthroughTransports() { + var pt1 = stream.PassThrough(); + var pt2 = stream.PassThrough(); + return [{readable: pt1, writable: pt2}, {readable: pt2, writable: pt1}]; +} + +// Simplified stream constructor API isn't available in earlier node versions. + +function createReadableStream(bufs) { + var n = 0; + function Stream() { stream.Readable.call(this); } + util.inherits(Stream, stream.Readable); + Stream.prototype._read = function () { + this.push(bufs[n++] || null); + }; + var readable = new Stream(); + return readable; +} + +function createWritableStream(bufs) { + function Stream() { stream.Writable.call(this); } + util.inherits(Stream, stream.Writable); + Stream.prototype._write = function (buf, encoding, cb) { + bufs.push(buf); + cb(); + }; + return new Stream(); +} + +// Combine two (binary) streams into a single duplex one. This is very basic +// and doesn't handle a lot of cases (e.g. where `_read` doesn't return +// something). +function toDuplex(readable, writable) { + function Stream() { + stream.Duplex.call(this); + this.on('finish', function () { writable.end(); }); + } + util.inherits(Stream, stream.Duplex); + Stream.prototype._read = function () { + this.push(readable.read()); + }; + Stream.prototype._write = function (buf, encoding, cb) { + writable.write(buf); + cb(); + }; + return new Stream(); +} http://git-wip-us.apache.org/repos/asf/avro/blob/9101a42b/lang/js/test/test_schemas.js ---------------------------------------------------------------------- diff --git a/lang/js/test/test_schemas.js b/lang/js/test/test_schemas.js index cd9a727..b6d197b 100644 --- a/lang/js/test/test_schemas.js +++ b/lang/js/test/test_schemas.js @@ -1926,6 +1926,17 @@ suite('types', function () { assert.deepEqual(person.age, 12); assert.deepEqual(person.time, date); assert.throws(function () { derived.toBuffer({age: -1, date: date}); }); + + var invalid = {age: -1, time: date}; + assert.throws(function () { derived.toBuffer(invalid); }); + var hasError = false; + derived.isValid(invalid, {errorHook: function (path, any, type) { + hasError = true; + assert.deepEqual(path, ['age']); + assert.equal(any, -1); + assert(type instanceof AgeType); + }}); + assert(hasError); }); test('recursive', function () {
