Author: cutting
Date: Thu Dec 31 21:52:35 2009
New Revision: 894946
URL: http://svn.apache.org/viewvc?rev=894946&view=rev
Log:
AVRO-267. Add two new avroj commands: rpcsend and rpcreceive. Contributed by
Philip Zeyliger.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java
hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java
hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/build.xml
hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java
hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java
hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Dec 31 21:52:35 2009
@@ -45,6 +45,9 @@
AVRO-258. Add GenAvro language tool. (Todd Lipcon via cutting)
+ AVRO-267. Add two new avroj commands: rpcsend and rpcreceive.
+ (Philip Zeyliger via cutting)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Modified: hadoop/avro/trunk/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Thu Dec 31 21:52:35 2009
@@ -156,7 +156,8 @@
</target>
<target name="compile-java" depends="javacc,schemata,ivy-retrieve">
- <java-compiler excludes="**/ipc/** **/*Requestor.java **/*Responder.java" >
+ <java-compiler
+ excludes="**/ipc/** **/*Requestor.java **/*Responder.java **/tool/**">
<src path="${build.dir}/src"/>
<src path="${java.src.dir}"/>
</java-compiler>
Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java Thu Dec 31
21:52:35 2009
@@ -43,10 +43,21 @@
super(root);
init(in);
}
+
+ JsonDecoder(Symbol root, String in) throws IOException {
+ super(root);
+ init(in);
+ }
+ /** Creates a new JsonDecoder based on an InputStream. */
public JsonDecoder(Schema schema, InputStream in) throws IOException {
this(new JsonGrammarGenerator().generate(schema), in);
}
+
+ /** Creates a new JsonDecoder based on a String input. */
+ public JsonDecoder(Schema schema, String in) throws IOException {
+ this(new JsonGrammarGenerator().generate(schema), in);
+ }
private void advance(Symbol symbol) throws IOException {
if (in.getCurrentToken() == null && this.parser.depth() == 1)
@@ -60,6 +71,13 @@
this.in = new JsonFactory().createJsonParser(in);
this.in.nextToken();
}
+
+ /** Re-initializes to start reading from a new String input. */
+ public void init(String in) throws IOException {
+ parser.reset();
+ this.in = new JsonFactory().createJsonParser(in);
+ this.in.nextToken();
+ }
@Override
public void readNull() throws IOException {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/Main.java Thu Dec 31
21:52:35 2009
@@ -43,7 +43,9 @@
new DataFileReadTool(),
new DataFileWriteTool(),
new DataFileGetSchemaTool(),
- new GenAvroTool()
+ new GenAvroTool(),
+ new RpcReceiveTool(),
+ new RpcSendTool()
}) {
Tool prev = tools.put(tool.getName(), tool);
if (prev != null) {
Added: hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java?rev=894946&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcReceiveTool.java Thu Dec
31 21:52:35 2009
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericResponder;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.HttpServer;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * Receives one RPC call and responds. (The moral equivalent
+ * of "netcat".)
+ */
+public class RpcReceiveTool implements Tool {
+ private PrintStream out;
+ private Object response;
+ /** Used to communicate between server thread (responder) and run() */
+ private CountDownLatch latch;
+ private Message expectedMessage;
+ HttpServer server;
+
+ @Override
+ public String getName() {
+ return "rpcreceive";
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "Opens an HTTP RPC Server and listens for one message.";
+ }
+
+ private class SinkResponder extends GenericResponder {
+
+ public SinkResponder(Protocol local) {
+ super(local);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ if (!message.equals(expectedMessage)) {
+ out.println(String.format("Expected message '%s' but received '%s'.",
+ expectedMessage.getName(), message.getName()));
+ latch.countDown();
+ throw new IllegalArgumentException("Unexpected message.");
+ }
+ out.print(message.getName());
+ out.print("\t");
+ try {
+ JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(
+ out, JsonEncoding.UTF8);
+ JsonEncoder jsonEncoder = new JsonEncoder(message.getRequest(),
jsonGenerator);
+
+ GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>(
+ message.getRequest());
+ writer.write(request, jsonEncoder);
+ jsonGenerator.flush();
+ jsonEncoder.flush();
+ out.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ out.println();
+ latch.countDown();
+ return response;
+ }
+ }
+
+ @Override
+ public int run(InputStream in, PrintStream out, PrintStream err,
+ List<String> args) throws Exception {
+ // Split up into two functions for easier testing.
+ int r = run1(in, out, err, args);
+ if (r != 0) {
+ return r;
+ }
+ return run2(err);
+ }
+
+ int run1(InputStream in, PrintStream out, PrintStream err,
+ List<String> args) throws Exception {
+ if (args.size() != 4) {
+ err.println("Expected four arguments: protocol port message_name
json_response");
+ return 1;
+ }
+ Protocol protocol = Protocol.parse(args.get(0));
+ int port = Integer.parseInt(args.get(1));
+ String messageName = args.get(2);
+ expectedMessage = protocol.getMessages().get(messageName);
+ if (expectedMessage == null) {
+ err.println(String.format("No message named '%s' found in protocol
'%s'.",
+ messageName, protocol));
+ return 1;
+ }
+ String jsonData = args.get(3);
+ this.out = out;
+
+ this.response = Util.jsonToGenericDatum(expectedMessage.getResponse(),
jsonData);
+
+ latch = new CountDownLatch(1);
+ server = new HttpServer(new SinkResponder(protocol), port);
+ err.println("Listening on port " + server.getPort());
+ return 0;
+ }
+
+ int run2(PrintStream err) throws InterruptedException {
+ latch.await();
+ err.println("Closing server.");
+ server.close();
+ return 0;
+ }
+
+}
Added: hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java?rev=894946&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/RpcSendTool.java Thu Dec 31
21:52:35 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.URL;
+import java.util.List;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRequestor;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+/**
+ * Sends a single RPC message.
+ */
+public class RpcSendTool implements Tool {
+ @Override
+ public String getName() {
+ return "rpcsend";
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "Sends a single RPC message.";
+ }
+
+ @Override
+ public int run(InputStream in, PrintStream out, PrintStream err,
+ List<String> args) throws Exception {
+ if (args.size() != 5) {
+ err.println(
+ "Expected 5 arguments: protocol message_name host port json_data");
+ return 1;
+ }
+ Protocol protocol = Protocol.parse(args.get(0));
+ String messageName = args.get(1);
+ Message message = protocol.getMessages().get(messageName);
+ if (message == null) {
+ err.println(String.format("No message named '%s' found in protocol
'%s'.",
+ messageName, protocol));
+ return 1;
+ }
+ String host = args.get(2);
+ int port = Integer.parseInt(args.get(3));
+ String jsonData = args.get(4);
+
+ Object datum = Util.jsonToGenericDatum(message.getRequest(), jsonData);
+ GenericRequestor client = makeClient(protocol, host, port);
+ Object response = client.request(message.getName(), datum);
+ dumpJson(out, message.getResponse(), response);
+ return 0;
+ }
+
+ private void dumpJson(PrintStream out, Schema schema, Object datum)
+ throws IOException {
+ DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
+ JsonGenerator g =
+ new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8);
+ g.useDefaultPrettyPrinter();
+ writer.write(datum, new JsonEncoder(schema, g));
+ g.flush();
+ out.println();
+ out.flush();
+ }
+
+ private GenericRequestor makeClient(Protocol protocol, String host, int
port)
+ throws IOException {
+ HttpTransceiver transceiver =
+ new HttpTransceiver(new URL("http", host, port, "/"));
+ GenericRequestor requestor = new GenericRequestor(protocol, transceiver);
+ return requestor;
+ }
+}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java?rev=894946&r1=894945&r2=894946&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/Util.java Thu Dec 31
21:52:35 2009
@@ -20,8 +20,13 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.InputStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.JsonDecoder;
+
/** Static utility methods for tools. */
class Util {
/**
@@ -36,4 +41,17 @@
return new FileInputStream(new File(filename));
}
}
+
+ /**
+ * Converts a String JSON object into a generic datum.
+ *
+ * This is inefficient (creates extra objects), so should be used
+ * sparingly.
+ */
+ static Object jsonToGenericDatum(Schema schema, String jsonData) throws
IOException {
+ GenericDatumReader<Object> reader =
+ new GenericDatumReader<Object>(schema);
+ Object datum = reader.read(null, new JsonDecoder(schema, jsonData));
+ return datum;
+ }
}
Added:
hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java?rev=894946&view=auto
==============================================================================
---
hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java
(added)
+++
hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestRpcReceiveAndSendTools.java
Thu Dec 31 21:52:35 2009
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.tool;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Arrays;
+
+import org.apache.avro.Protocol;
+import org.junit.Test;
+
+public class TestRpcReceiveAndSendTools {
+
+ /**
+ * Starts a server (using the tool) and sends a single message to it.
+ */
+ @Test
+ public void testServeAndSend() throws Exception {
+ Protocol protocol = Protocol.parse("" +
+ "{\"protocol\": \"Minimal\", " +
+ "\"messages\": { \"sink\": {" +
+ " \"request\": [{\"name\": \"a\", \"type\": \"string\"}], " +
+ " \"response\": \"string\"} } }");
+ ByteArrayOutputStream baos1 = new ByteArrayOutputStream();
+ PrintStream p1 = new PrintStream(baos1);
+ RpcReceiveTool receive = new RpcReceiveTool();
+ receive.run1(null, p1, System.err,
+ Arrays.asList(protocol.toString(), "0", "sink", "\"omega\""));
+ int port = receive.server.getPort();
+ ByteArrayOutputStream baos2 = new ByteArrayOutputStream();
+ PrintStream p2 = new PrintStream(baos2);
+ RpcSendTool send = new RpcSendTool();
+ send.run(null, p2, System.err,
+ Arrays.asList(protocol.toString(), "sink", "localhost",
+ Integer.toString(port), "{ \"a\": \"alpha\" }"));
+ receive.run2(System.err);
+
+ assertEquals("sink\t{\"a\":\"alpha\"}\n", baos1.toString("UTF-8"));
+ assertEquals("\"omega\"\n", baos2.toString("UTF-8"));
+ }
+}