Author: cutting
Date: Thu Sep 29 19:03:10 2011
New Revision: 1177399

URL: http://svn.apache.org/viewvc?rev=1177399&view=rev
Log:
HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol introduced 
in HADOOP-724.

Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1177399&r1=1177398&r2=1177399&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Thu Sep 
29 19:03:10 2011
@@ -413,6 +413,9 @@ Release 0.23.0 - Unreleased
     HADOOP-7575. Enhanced LocalDirAllocator to support fully-qualified
     paths. (Jonathan Eagles via vinodkv)
 
+    HADOOP-7693. Enhance AvroRpcEngine to support the new #addProtocol
+    interface introduced in HADOOP-7524.  (cutting)
+
   OPTIMIZATIONS
   
     HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1177399&r1=1177398&r2=1177399&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AvroRpcEngine.java
 Thu Sep 29 19:03:10 2011
@@ -29,6 +29,8 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
@@ -54,7 +56,7 @@ import org.apache.hadoop.security.token.
 public class AvroRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
 
-  private static int VERSION = 0;
+  private static int VERSION = 1;
 
   // the implementation we tunnel through
   private static final RpcEngine ENGINE = new WritableRpcEngine();
@@ -62,9 +64,10 @@ public class AvroRpcEngine implements Rp
   /** Tunnel an Avro RPC request and response through Hadoop's RPC. */
   private static interface TunnelProtocol extends VersionedProtocol {
     //WritableRpcEngine expects a versionID in every protocol.
-    public static final long versionID = 0L;
+    public static final long versionID = VERSION;
     /** All Avro methods and responses go through this. */
-    BufferListWritable call(BufferListWritable request) throws IOException;
+    BufferListWritable call(String protocol, BufferListWritable request)
+      throws IOException;
   }
 
   /** A Writable that holds a List<ByteBuffer>, The Avro RPC Transceiver's
@@ -103,23 +106,25 @@ public class AvroRpcEngine implements Rp
   private static class ClientTransceiver extends Transceiver {
     private TunnelProtocol tunnel;
     private InetSocketAddress remote;
+    private String protocol;
   
     public ClientTransceiver(InetSocketAddress addr,
                              UserGroupInformation ticket,
                              Configuration conf, SocketFactory factory,
-                             int rpcTimeout)
+                             int rpcTimeout, String protocol)
       throws IOException {
       this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION,
                                         addr, ticket, conf, factory,
                                         rpcTimeout).getProxy();
       this.remote = addr;
+      this.protocol = protocol;
     }
 
     public String getRemoteName() { return remote.toString(); }
 
     public List<ByteBuffer> transceive(List<ByteBuffer> request)
       throws IOException {
-      return tunnel.call(new BufferListWritable(request)).buffers;
+      return tunnel.call(protocol, new BufferListWritable(request)).buffers;
     }
 
     public List<ByteBuffer> readBuffers() throws IOException {
@@ -159,7 +164,8 @@ public class AvroRpcEngine implements Rp
                    UserGroupInformation ticket, Configuration conf,
                    SocketFactory factory,
                    int rpcTimeout) throws IOException {
-      this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout);
+      this.tx = new ClientTransceiver(addr, ticket, conf, factory, rpcTimeout,
+                                      protocol.getName());
       this.requestor = createRequestor(protocol, tx);
     }
     @Override public Object invoke(Object proxy, Method method, Object[] args) 
@@ -182,9 +188,11 @@ public class AvroRpcEngine implements Rp
 
   /** An Avro RPC Responder that can process requests passed via Hadoop RPC. */
   private class TunnelResponder implements TunnelProtocol {
-    private Responder responder;
-    public TunnelResponder(Class<?> iface, Object impl) {
-      responder = createResponder(iface, impl);
+    private Map<String, Responder> responders =
+      new HashMap<String, Responder>();
+
+    public void addProtocol(Class<?> iface, Object impl) {
+      responders.put(iface.getName(), createResponder(iface, impl));
     }
 
     @Override
@@ -197,13 +205,18 @@ public class AvroRpcEngine implements Rp
     public ProtocolSignature getProtocolSignature(
         String protocol, long version, int clientMethodsHashCode)
       throws IOException {
-      return new ProtocolSignature(VERSION, null);
+      return ProtocolSignature.getProtocolSignature
+        (clientMethodsHashCode, VERSION, TunnelProtocol.class);
     }
 
-    public BufferListWritable call(final BufferListWritable request)
+    public BufferListWritable call(String protocol, BufferListWritable request)
       throws IOException {
+      Responder responder = responders.get(protocol);
+      if (responder == null)
+        throw new IOException("No responder for: "+protocol);
       return new BufferListWritable(responder.respond(request.buffers));
     }
+
   }
 
   public Object[] call(Method method, Object[][] params,
@@ -212,6 +225,32 @@ public class AvroRpcEngine implements Rp
     throw new UnsupportedOperationException();
   }
 
+  private class Server extends WritableRpcEngine.Server {
+    private TunnelResponder responder = new TunnelResponder();
+
+    public Server(Class<?> iface, Object impl, String bindAddress,
+                  int port, int numHandlers, int numReaders,
+                  int queueSizePerHandler, boolean verbose,
+                  Configuration conf, 
+                  SecretManager<? extends TokenIdentifier> secretManager
+                  ) throws IOException {
+      super((Class)null, new Object(), conf,
+            bindAddress, port, numHandlers, numReaders,
+            queueSizePerHandler, verbose, secretManager);
+      super.addProtocol(TunnelProtocol.class, responder);
+      responder.addProtocol(iface, impl);
+    }
+
+
+    @Override
+    public <PROTO, IMPL extends PROTO> Server
+      addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl)
+        throws IOException {
+      responder.addProtocol(protocolClass, protocolImpl);
+      return this;
+    }
+  }
+
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
   public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
@@ -220,10 +259,9 @@ public class AvroRpcEngine implements Rp
                               Configuration conf, 
                        SecretManager<? extends TokenIdentifier> secretManager
                               ) throws IOException {
-    return ENGINE.getServer(TunnelProtocol.class,
-                            new TunnelResponder(iface, impl),
-                            bindAddress, port, numHandlers, numReaders,
-                            queueSizePerHandler, verbose, conf, secretManager);
+    return new Server
+      (iface, impl, bindAddress, port, numHandlers, numReaders,
+       queueSizePerHandler, verbose, conf, secretManager);
   }
 
 }

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java?rev=1177399&r1=1177398&r2=1177399&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
 Thu Sep 29 19:03:10 2011
@@ -605,7 +605,7 @@ public class RPC {
      * @param protocolImpl - the impl of the protocol that will be called
      * @return the server (for convenience)
      */
-    public <PROTO extends VersionedProtocol, IMPL extends PROTO>
+    public <PROTO, IMPL extends PROTO>
       Server addProtocol(Class<PROTO> protocolClass, IMPL protocolImpl
     ) throws IOException {
       throw new IOException("addProtocol Not Implemented");

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1177399&r1=1177398&r2=1177399&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
 Thu Sep 29 19:03:10 2011
@@ -555,7 +555,7 @@ public class WritableRpcEngine implement
 
  
     @Override
-    public <PROTO extends VersionedProtocol, IMPL extends PROTO> Server
+    public <PROTO, IMPL extends PROTO> Server
       addProtocol(
         Class<PROTO> protocolClass, IMPL protocolImpl) throws IOException {
       registerProtocolAndImpl(protocolClass, protocolImpl);

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java?rev=1177399&r1=1177398&r2=1177399&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAvroRpc.java
 Thu Sep 29 19:03:10 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 
 /** Unit tests for AvroRpc. */
 public class TestAvroRpc extends TestCase {
@@ -56,6 +57,9 @@ public class TestAvroRpc extends TestCas
 
   public TestAvroRpc(String name) { super(name); }
        
+  public static interface EmptyProtocol {}
+  public static class EmptyImpl implements EmptyProtocol {}
+
   public static class TestImpl implements AvroTestProtocol {
 
     public void ping() {}
@@ -93,10 +97,12 @@ public class TestAvroRpc extends TestCas
       sm = new TestTokenSecretManager();
     }
     UserGroupInformation.setConfiguration(conf);
+    RPC.setProtocolEngine(conf, EmptyProtocol.class, AvroRpcEngine.class);
     RPC.setProtocolEngine(conf, AvroTestProtocol.class, AvroRpcEngine.class);
-    Server server = RPC.getServer(AvroTestProtocol.class,
-                                  new TestImpl(), ADDRESS, 0, 5, true, 
-                                  conf, sm);
+    RPC.Server server = RPC.getServer(EmptyProtocol.class, new EmptyImpl(),
+                                      ADDRESS, 0, 5, true, conf, sm);
+    server.addProtocol(AvroTestProtocol.class, new TestImpl());
+
     try {
       server.start();
       InetSocketAddress addr = NetUtils.getConnectAddress(server);


Reply via email to