Author: hairong Date: Fri Jan 28 22:45:58 2011 New Revision: 1064919 URL: http://svn.apache.org/viewvc?rev=1064919&view=rev Log: HADOOP-6904. Support method based RPC compatiblity. Contributed by Hairong Kuang.
Added: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolSignature.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Modified: hadoop/common/trunk/CHANGES.txt hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java hadoop/common/trunk/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java Modified: hadoop/common/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/CHANGES.txt (original) +++ hadoop/common/trunk/CHANGES.txt Fri Jan 28 22:45:58 2011 @@ -4,6 +4,8 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES + HADOOP-6904. Support method based RPC compatiblity. (hairong) + NEW FEATURES HADOOP-7023. Add listCorruptFileBlocks to Filesysem. (Patrick Kling Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Fri Jan 28 22:45:58 2011 @@ -107,10 +107,9 @@ public class AvroRpcEngine implements Rp Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - this.tunnel = - (TunnelProtocol)ENGINE.getProxy(TunnelProtocol.class, VERSION, + this.tunnel = ENGINE.getProxy(TunnelProtocol.class, VERSION, addr, ticket, conf, factory, - rpcTimeout); + rpcTimeout).getProxy(); this.remote = addr; } @@ -135,16 +134,20 @@ public class AvroRpcEngine implements Rp } /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. */ - public Object getProxy(Class<?> protocol, long clientVersion, + * talking to a server at the named address. + * @param <T>*/ + @SuppressWarnings("unchecked") + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - return Proxy.newProxyInstance - (protocol.getClassLoader(), - new Class[] { protocol }, - new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); + return new ProtocolProxy<T>(protocol, + (T)Proxy.newProxyInstance( + protocol.getClassLoader(), + new Class[] { protocol }, + new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)), + null); } /** Stop this proxy. */ @@ -191,11 +194,19 @@ public class AvroRpcEngine implements Rp responder = createResponder(iface, impl); } + @Override public long getProtocolVersion(String protocol, long version) - throws IOException { + throws IOException { return VERSION; } + @Override + public ProtocolSignature getProtocolSignature( + String protocol, long version, int clientMethodsHashCode) + throws IOException { + return new ProtocolSignature(VERSION, null); + } + public BufferListWritable call(final BufferListWritable request) throws IOException { return new BufferListWritable(responder.respond(request.buffers)); Added: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java?rev=1064919&view=auto ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java (added) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolProxy.java Fri Jan 28 22:45:58 2011 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.HashSet; + + +/** + * a class wraps around a server's proxy, + * containing a list of its supported methods. + * + * A list of methods with a value of null indicates that the client and server + * have the same protocol. + */ +public class ProtocolProxy<T> { + private Class<T> protocol; + private T proxy; + private HashSet<Integer> serverMethods = null; + + /** + * Constructor + * + * @param protocol protocol class + * @param proxy its proxy + * @param serverMethods a list of hash codes of the methods that it supports + * @throws ClassNotFoundException + */ + public ProtocolProxy(Class<T> protocol, T proxy, int[] serverMethods) { + this.protocol = protocol; + this.proxy = proxy; + if (serverMethods != null) { + this.serverMethods = new HashSet<Integer>(serverMethods.length); + for (int method : serverMethods) { + this.serverMethods.add(Integer.valueOf(method)); + } + } + } + + /* + * Get the proxy + */ + public T getProxy() { + return proxy; + } + + /** + * Check if a method is supported by the server or not + * + * @param methodName a method's name in String format + * @param parameterTypes a method's parameter types + * @return true if the method is supported by the server + */ + public boolean isMethodSupported(String methodName, + Class<?>... parameterTypes) + throws IOException { + if (serverMethods == null) { // client & server have the same protocol + return true; + } + Method method; + try { + method = protocol.getDeclaredMethod(methodName, parameterTypes); + } catch (SecurityException e) { + throw new IOException(e); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } + return serverMethods.contains( + Integer.valueOf(ProtocolSignature.getFingerprint(method))); + } +} \ No newline at end of file Added: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolSignature.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolSignature.java?rev=1064919&view=auto ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolSignature.java (added) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/ProtocolSignature.java Fri Jan 28 22:45:58 2011 @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +public class ProtocolSignature implements Writable { + static { // register a ctor + WritableFactories.setFactory + (ProtocolSignature.class, + new WritableFactory() { + public Writable newInstance() { return new ProtocolSignature(); } + }); + } + + private long version; + private int[] methods = null; // an array of method hash codes + + /** + * default constructor + */ + public ProtocolSignature() { + } + + /** + * Constructor + * + * @param version server version + * @param methodHashcodes hash codes of the methods supported by server + */ + public ProtocolSignature(long version, int[] methodHashcodes) { + this.version = version; + this.methods = methodHashcodes; + } + + public long getVersion() { + return version; + } + + public int[] getMethods() { + return methods; + } + + @Override + public void readFields(DataInput in) throws IOException { + version = in.readLong(); + boolean hasMethods = in.readBoolean(); + if (hasMethods) { + int numMethods = in.readInt(); + methods = new int[numMethods]; + for (int i=0; i<numMethods; i++) { + methods[i] = in.readInt(); + } + } + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(version); + if (methods == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(methods.length); + for (int method : methods) { + out.writeInt(method); + } + } + } + + /** + * Calculate a method's hash code considering its method + * name, returning type, and its parameter types + * + * @param method a method + * @return its hash code + */ + static int getFingerprint(Method method) { + int hashcode = method.getName().hashCode(); + hashcode = hashcode + 31*method.getReturnType().getName().hashCode(); + for (Class<?> type : method.getParameterTypes()) { + hashcode = 31*hashcode ^ type.getName().hashCode(); + } + return hashcode; + } + + /** + * Convert an array of Method into an array of hash codes + * + * @param methods + * @return array of hash codes + */ + private static int[] getFingerprints(Method[] methods) { + if (methods == null) { + return null; + } + int[] hashCodes = new int[methods.length]; + for (int i = 0; i<methods.length; i++) { + hashCodes[i] = getFingerprint(methods[i]); + } + return hashCodes; + } + + /** + * Get the hash code of an array of methods + * Methods are sorted before hashcode is calculated. + * So the returned value is irrelevant of the method order in the array. + * + * @param methods an array of methods + * @return the hash code + */ + static int getFingerprint(Method[] methods) { + return getFingerprint(getFingerprints(methods)); + } + + /** + * Get the hash code of an array of hashcodes + * Hashcodes are sorted before hashcode is calculated. + * So the returned value is irrelevant of the hashcode order in the array. + * + * @param methods an array of methods + * @return the hash code + */ + static int getFingerprint(int[] hashcodes) { + Arrays.sort(hashcodes); + return Arrays.hashCode(hashcodes); + + } + private static class ProtocolSigFingerprint { + private ProtocolSignature signature; + private int fingerprint; + + ProtocolSigFingerprint(ProtocolSignature sig, int fingerprint) { + this.signature = sig; + this.fingerprint = fingerprint; + } + } + + /** + * A cache that maps a protocol's name to its signature & finger print + */ + final private static HashMap<String, ProtocolSigFingerprint> + PROTOCOL_FINGERPRINT_CACHE = + new HashMap<String, ProtocolSigFingerprint>(); + + /** + * Return a protocol's signature and finger print from cache + * + * @param protocol a protocol class + * @param serverVersion protocol version + * @return its signature and finger print + */ + private static ProtocolSigFingerprint getSigFingerprint( + Class <? extends VersionedProtocol> protocol, long serverVersion) { + String protocolName = protocol.getName(); + synchronized (PROTOCOL_FINGERPRINT_CACHE) { + ProtocolSigFingerprint sig = PROTOCOL_FINGERPRINT_CACHE.get(protocolName); + if (sig == null) { + int[] serverMethodHashcodes = getFingerprints(protocol.getMethods()); + sig = new ProtocolSigFingerprint( + new ProtocolSignature(serverVersion, serverMethodHashcodes), + getFingerprint(serverMethodHashcodes)); + PROTOCOL_FINGERPRINT_CACHE.put(protocolName, sig); + } + return sig; + } + } + + /** + * Get a server protocol's signature + * + * @param clientMethodsHashCode client protocol methods hashcode + * @param serverVersion server protocol version + * @param protocol protocol + * @return the server's protocol signature + */ + static ProtocolSignature getProtocolSignature( + int clientMethodsHashCode, + long serverVersion, + Class<? extends VersionedProtocol> protocol) { + // try to get the finger print & signature from the cache + ProtocolSigFingerprint sig = getSigFingerprint(protocol, serverVersion); + + // check if the client side protocol matches the one on the server side + if (clientMethodsHashCode == sig.fingerprint) { + return new ProtocolSignature(serverVersion, null); // null indicates a match + } + + return sig.signature; + } + + /** + * Get a server protocol's signature + * + * @param server server implementation + * @param protocol server protocol + * @param clientVersion client's version + * @param clientMethodsHash client's protocol's hash code + * @return the server protocol's signature + * @throws IOException if any error occurs + */ + @SuppressWarnings("unchecked") + public static ProtocolSignature getProtocolSigature(VersionedProtocol server, + String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + Class<? extends VersionedProtocol> inter; + try { + inter = (Class<? extends VersionedProtocol>)Class.forName(protocol); + } catch (Exception e) { + throw new IOException(e); + } + long serverVersion = server.getProtocolVersion(protocol, clientVersion); + return ProtocolSignature.getProtocolSignature( + clientMethodsHash, serverVersion, inter); + } +} \ No newline at end of file Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Fri Jan 28 22:45:58 2011 @@ -37,12 +37,9 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AuthorizationException; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.conf.*; -import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; import org.apache.hadoop.util.ReflectionUtils; /** A simple RPC mechanism. @@ -64,7 +61,7 @@ import org.apache.hadoop.util.Reflection * the protocol instance is transmitted. */ public class RPC { - private static final Log LOG = LogFactory.getLog(RPC.class); + static final Log LOG = LogFactory.getLog(RPC.class); private RPC() {} // no public ctor @@ -159,18 +156,48 @@ public class RPC { return serverVersion; } } - - public static Object waitForProxy( - Class<?> protocol, + + /** + * Get a proxy connection to a remote server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @return the proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> T waitForProxy( + Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf ) throws IOException { - return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE); + return waitForProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @return the protocol proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, + Configuration conf) throws IOException { + return waitForProtocolProxy( + protocol, clientVersion, addr, conf, Long.MAX_VALUE); } /** * Get a proxy connection to a remote server + * * @param protocol protocol class * @param clientVersion client version * @param addr remote address @@ -179,23 +206,68 @@ public class RPC { * @return the proxy * @throws IOException if the far end through a RemoteException */ - public static Object waitForProxy(Class<?> protocol, long clientVersion, + public static <T> T waitForProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf, long connTimeout) throws IOException { - return waitForProxy(protocol, clientVersion, addr, conf, 0, connTimeout); + return waitForProtocolProxy(protocol, clientVersion, addr, + conf, connTimeout).getProxy(); } - /** - * Get a proxy connection to a remote server - * @param protocol protocol class - * @param clientVersion client version - * @param addr remote address - * @param conf configuration to use - * @param rpcTimeout timeout for each RPC - * @param timeout time in milliseconds before giving up - * @return the proxy - * @throws IOException if the far end through a RemoteException - */ - public static Object waitForProxy(Class<?> protocol, long clientVersion, + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @param connTimeout time in milliseconds before giving up + * @return the protocol proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, Configuration conf, + long connTimeout) throws IOException { + return waitForProtocolProxy(protocol, clientVersion, addr, conf, 0, connTimeout); + } + + /** + * Get a proxy connection to a remote server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @param rpcTimeout timeout for each RPC + * @param timeout time in milliseconds before giving up + * @return the proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> T waitForProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, Configuration conf, + int rpcTimeout, + long timeout) throws IOException { + return waitForProtocolProxy(protocol, clientVersion, addr, + conf, rpcTimeout, timeout).getProxy(); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @param rpcTimeout timeout for each RPC + * @param timeout time in milliseconds before giving up + * @return the proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol, + long clientVersion, InetSocketAddress addr, Configuration conf, int rpcTimeout, long timeout) throws IOException { @@ -203,7 +275,7 @@ public class RPC { IOException ioe; while (true) { try { - return getProxy(protocol, clientVersion, addr, + return getProtocolProxy(protocol, clientVersion, addr, UserGroupInformation.getCurrentUser(), conf, NetUtils .getDefaultSocketFactory(conf), rpcTimeout); } catch(ConnectException se) { // namenode has not been started @@ -231,27 +303,76 @@ public class RPC { } /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. */ - public static Object getProxy(Class<?> protocol, long clientVersion, + * talking to a server at the named address. + * @param <T>*/ + public static <T> T getProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, Configuration conf, + SocketFactory factory) throws IOException { + return getProtocolProxy( + protocol, clientVersion, addr, conf, factory).getProxy(); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param conf configuration to use + * @param factory socket factory + * @return the protocol proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, + long clientVersion, InetSocketAddress addr, Configuration conf, SocketFactory factory) throws IOException { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); - return getProxy(protocol, clientVersion, addr, ugi, conf, factory); + return getProtocolProxy(protocol, clientVersion, addr, ugi, conf, factory); } /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. */ - public static Object getProxy(Class<?> protocol, long clientVersion, + * talking to a server at the named address. + * @param <T>*/ + public static <T> T getProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory) throws IOException { + return getProtocolProxy( + protocol, clientVersion, addr, ticket, conf, factory).getProxy(); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol class + * @param clientVersion client version + * @param addr remote address + * @param ticket user group information + * @param conf configuration to use + * @param factory socket factory + * @return the protocol proxy + * @throws IOException if the far end through a RemoteException + */ + public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, + long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, 0); + return getProtocolProxy( + protocol, clientVersion, addr, ticket, conf, factory, 0); } /** * Construct a client-side proxy that implements the named protocol, * talking to a server at the named address. + * @param <T> * * @param protocol protocol * @param clientVersion client's version @@ -263,7 +384,33 @@ public class RPC { * @return the proxy * @throws IOException if any error occurs */ - public static Object getProxy(Class<?> protocol, long clientVersion, + public static <T> T getProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, + UserGroupInformation ticket, + Configuration conf, + SocketFactory factory, + int rpcTimeout) throws IOException { + return getProtocolProxy(protocol, clientVersion, addr, ticket, + conf, factory, rpcTimeout).getProxy(); + } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server + * + * @param protocol protocol + * @param clientVersion client's version + * @param addr server address + * @param ticket security ticket + * @param conf configuration + * @param factory socket factory + * @param rpcTimeout max time for each rpc; 0 means no timeout + * @return the proxy + * @throws IOException if any error occurs + */ + public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, + long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, @@ -276,21 +423,42 @@ public class RPC { clientVersion, addr, ticket, conf, factory, rpcTimeout); } + /** + * Construct a client-side proxy object with the default SocketFactory + * @param <T> + * + * @param protocol + * @param clientVersion + * @param addr + * @param conf + * @return a proxy instance + * @throws IOException + */ + public static <T> T getProxy(Class<T> protocol, + long clientVersion, + InetSocketAddress addr, Configuration conf) + throws IOException { + + return getProtocolProxy(protocol, clientVersion, addr, conf).getProxy(); + } + /** - * Construct a client-side proxy object with the default SocketFactory + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server * * @param protocol * @param clientVersion * @param addr * @param conf - * @return a proxy instance + * @return a protocol proxy * @throws IOException */ - public static Object getProxy(Class<?> protocol, long clientVersion, + public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol, + long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { - return getProxy(protocol, clientVersion, addr, conf, NetUtils + return getProtocolProxy(protocol, clientVersion, addr, conf, NetUtils .getDefaultSocketFactory(conf)); } Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Fri Jan 28 22:45:58 2011 @@ -34,8 +34,9 @@ import org.apache.hadoop.security.token. @InterfaceStability.Evolving public interface RpcEngine { - /** Construct a client-side proxy object. */ - Object getProxy(Class<?> protocol, + /** Construct a client-side proxy object. + * @param <T>*/ + <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException; Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/VersionedProtocol.java Fri Jan 28 22:45:58 2011 @@ -32,7 +32,23 @@ public interface VersionedProtocol { * @param protocol The classname of the protocol interface * @param clientVersion The version of the protocol that the client speaks * @return the version that the server will speak + * @throws IOException if any IO error occurs */ - public long getProtocolVersion(String protocol, + @Deprecated + public long getProtocolVersion(String protocol, long clientVersion) throws IOException; + + /** + * Return protocol version corresponding to protocol interface. + * @param protocol The classname of the protocol interface + * @param clientVersion The version of the protocol that the client speaks + * @param clientMethodsHash the hashcode of client protocol methods + * @return the server protocol signature containing its version and + * a list of its supported methods + * @see ProtocolSignature#getProtocolSigature(VersionedProtocol, String, + * long, int) for a default implementation + */ + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, + int clientMethodsHash) throws IOException; } Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Fri Jan 28 22:45:58 2011 @@ -35,7 +35,6 @@ import org.apache.commons.logging.*; import org.apache.hadoop.io.*; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.classification.InterfaceAudience; @@ -221,25 +220,32 @@ public class WritableRpcEngine implement } /** Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. */ - public Object getProxy(Class<?> protocol, long clientVersion, + * talking to a server at the named address. + * @param <T>*/ + @Override + @SuppressWarnings("unchecked") + public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { - Object proxy = Proxy.newProxyInstance + T proxy = (T)Proxy.newProxyInstance (protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)); + int[] serverMethods = null; if (proxy instanceof VersionedProtocol) { - long serverVersion = ((VersionedProtocol)proxy) - .getProtocolVersion(protocol.getName(), clientVersion); + ProtocolSignature serverInfo = ((VersionedProtocol)proxy) + .getProtocolSignature(protocol.getName(), clientVersion, + ProtocolSignature.getFingerprint(protocol.getMethods())); + long serverVersion = serverInfo.getVersion(); if (serverVersion != clientVersion) { throw new RPC.VersionMismatch(protocol.getName(), clientVersion, serverVersion); } + serverMethods = serverInfo.getMethods(); } - return proxy; + return new ProtocolProxy<T>(protocol, proxy, serverMethods); } /** Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/MiniRPCBenchmark.java Fri Jan 28 22:45:58 2011 @@ -132,6 +132,15 @@ public class MiniRPCBenchmark { throw new IOException("Unknown protocol: " + protocol); } + @Override // VersionedProtocol + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, + int clientMethodsHashCode) throws IOException { + if (protocol.equals(MiniProtocol.class.getName())) + return new ProtocolSignature(versionID, null); + throw new IOException("Unknown protocol: " + protocol); + } + @Override // MiniProtocol public Token<TestDelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException { Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Fri Jan 28 22:45:58 2011 @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.spi.NullContext; import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate; import org.apache.hadoop.net.NetUtils; @@ -81,6 +80,11 @@ public class TestRPC extends TestCase { return TestProtocol.versionID; } + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int hashcode) { + return new ProtocolSignature(TestProtocol.versionID, null); + } + public void ping() {} public synchronized void slowPing(boolean shouldSlow) { Added: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java?rev=1064919&view=auto ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java (added) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPCCompatibility.java Fri Jan 28 22:45:58 2011 @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.net.NetUtils; + +import org.junit.After; + +import org.junit.Test; + +/** Unit test for supporting method-name based compatible RPCs. */ +public class TestRPCCompatibility { + private static final String ADDRESS = "0.0.0.0"; + private static InetSocketAddress addr; + private static Server server; + private ProtocolProxy<?> proxy; + + public static final Log LOG = + LogFactory.getLog(TestRPCCompatibility.class); + + private static Configuration conf = new Configuration(); + + public interface TestProtocol0 extends VersionedProtocol { + public static final long versionID = 0L; + void ping() throws IOException; + } + + public interface TestProtocol1 extends TestProtocol0 { + String echo(String value) throws IOException; + } + + public interface TestProtocol2 extends TestProtocol1 { + int echo(int value) throws IOException; + } + + public static class TestImpl0 implements TestProtocol0 { + @Override + public long getProtocolVersion(String protocol, + long clientVersion) throws IOException { + return versionID; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHashCode) + throws IOException { + Class<? extends VersionedProtocol> inter; + try { + inter = (Class<? extends VersionedProtocol>)getClass().getGenericInterfaces()[0]; + } catch (Exception e) { + throw new IOException(e); + } + return ProtocolSignature.getProtocolSignature(clientMethodsHashCode, + getProtocolVersion(protocol, clientVersion), inter); + } + + @Override + public void ping() { return; } + } + + public static class TestImpl1 extends TestImpl0 implements TestProtocol1 { + @Override + public String echo(String value) { return value; } + } + + public static class TestImpl2 extends TestImpl1 implements TestProtocol2 { + @Override + public int echo(int value) { return value; } + } + + @After + public void tearDown() throws IOException { + if (proxy != null) { + RPC.stopProxy(proxy.getProxy()); + } + if (server != null) { + server.stop(); + } + } + + @Test // old client vs new server + public void testVersion0ClientVersion1Server() throws Exception { + // create a server with two handlers + server = RPC.getServer(TestProtocol1.class, + new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + proxy = RPC.getProtocolProxy( + TestProtocol0.class, TestProtocol0.versionID, addr, conf); + + TestProtocol0 proxy0 = (TestProtocol0)proxy.getProxy(); + proxy0.ping(); + } + + @Test // old client vs new server + public void testVersion1ClientVersion0Server() throws Exception { + // create a server with two handlers + server = RPC.getServer(TestProtocol0.class, + new TestImpl0(), ADDRESS, 0, 2, false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + proxy = RPC.getProtocolProxy( + TestProtocol1.class, TestProtocol1.versionID, addr, conf); + + TestProtocol1 proxy1 = (TestProtocol1)proxy.getProxy(); + proxy1.ping(); + try { + proxy1.echo("hello"); + fail("Echo should fail"); + } catch(IOException e) { + } + } + + private class Version2Client { + + private TestProtocol2 proxy2; + private ProtocolProxy<TestProtocol2> serverInfo; + + private Version2Client() throws IOException { + serverInfo = RPC.getProtocolProxy( + TestProtocol2.class, TestProtocol2.versionID, addr, conf); + proxy2 = serverInfo.getProxy(); + } + + public int echo(int value) throws IOException, NumberFormatException { + if (serverInfo.isMethodSupported("echo", int.class)) { + return -value; // use version 3 echo long + } else { // server is version 2 + return Integer.parseInt(proxy2.echo(String.valueOf(value))); + } + } + + public String echo(String value) throws IOException { + return proxy2.echo(value); + } + + public void ping() throws IOException { + proxy2.ping(); + } + } + + @Test // Compatible new client & old server + public void testVersion2ClientVersion1Server() throws Exception { + // create a server with two handlers + server = RPC.getServer(TestProtocol1.class, + new TestImpl1(), ADDRESS, 0, 2, false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + + Version2Client client = new Version2Client(); + client.ping(); + assertEquals("hello", client.echo("hello")); + + // echo(int) is not supported by server, so returning 3 + // This verifies that echo(int) and echo(String)'s hash codes are different + assertEquals(3, client.echo(3)); + } + + @Test // equal version client and server + public void testVersion2ClientVersion2Server() throws Exception { + // create a server with two handlers + server = RPC.getServer(TestProtocol2.class, + new TestImpl2(), ADDRESS, 0, 2, false, conf, null); + server.start(); + addr = NetUtils.getConnectAddress(server); + + Version2Client client = new Version2Client(); + + client.ping(); + assertEquals("hello", client.echo("hello")); + + // now that echo(int) is supported by the server, echo(int) should return -3 + assertEquals(-3, client.echo(3)); + } + + public interface TestProtocol3 { + int echo(String value); + int echo(int value); + int echo_alias(int value); + int echo(int value1, int value2); + } + + @Test + public void testHashCode() throws Exception { + // make sure that overriding methods have different hashcodes + Method strMethod = TestProtocol3.class.getMethod("echo", String.class); + int stringEchoHash = ProtocolSignature.getFingerprint(strMethod); + Method intMethod = TestProtocol3.class.getMethod("echo", int.class); + int intEchoHash = ProtocolSignature.getFingerprint(intMethod); + assertFalse(stringEchoHash == intEchoHash); + + // make sure methods with the same signature + // from different declaring classes have the same hash code + int intEchoHash1 = ProtocolSignature.getFingerprint( + TestProtocol2.class.getMethod("echo", int.class)); + assertEquals(intEchoHash, intEchoHash1); + + // Methods with the same name and parameter types but different returning + // types have different hash codes + int stringEchoHash1 = ProtocolSignature.getFingerprint( + TestProtocol2.class.getMethod("echo", String.class)); + assertFalse(stringEchoHash == stringEchoHash1); + + // Make sure that methods with the same returning type and parameter types + // but different method names have different hash code + int intEchoHashAlias = ProtocolSignature.getFingerprint( + TestProtocol3.class.getMethod("echo_alias", int.class)); + assertFalse(intEchoHash == intEchoHashAlias); + + // Make sure that methods with the same returninig type and method name but + // larger number of parameter types have different hash code + int intEchoHash2 = ProtocolSignature.getFingerprint( + TestProtocol3.class.getMethod("echo", int.class, int.class)); + assertFalse(intEchoHash == intEchoHash2); + + // make sure that methods order does not matter for method array hash code + int hash1 = ProtocolSignature.getFingerprint(new Method[] {intMethod, strMethod}); + int hash2 = ProtocolSignature.getFingerprint(new Method[] {strMethod, intMethod}); + assertEquals(hash1, hash2); + } +} \ No newline at end of file Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java?rev=1064919&r1=1064918&r2=1064919&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/security/TestDoAsEffectiveUser.java Fri Jan 28 22:45:58 2011 @@ -30,6 +30,7 @@ import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.VersionedProtocol; @@ -134,9 +135,14 @@ public class TestDoAsEffectiveUser { public long getProtocolVersion(String protocol, long clientVersion) throws IOException { - // TODO Auto-generated method stub return TestProtocol.versionID; } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return new ProtocolSignature(TestProtocol.versionID, null); + } } @Test @@ -161,7 +167,7 @@ public class TestDoAsEffectiveUser { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction<String>() { public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); String ret = proxy.aMethod(); return ret; @@ -203,7 +209,7 @@ public class TestDoAsEffectiveUser { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction<String>() { public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); String ret = proxy.aMethod(); return ret; @@ -250,7 +256,7 @@ public class TestDoAsEffectiveUser { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction<String>() { public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); String ret = proxy.aMethod(); return ret; @@ -289,7 +295,7 @@ public class TestDoAsEffectiveUser { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction<String>() { public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); String ret = proxy.aMethod(); return ret; @@ -368,7 +374,7 @@ public class TestDoAsEffectiveUser { String retVal = proxyUserUgi .doAs(new PrivilegedExceptionAction<String>() { public String run() throws IOException { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); String ret = proxy.aMethod(); return ret; @@ -424,7 +430,7 @@ public class TestDoAsEffectiveUser { @Override public String run() throws Exception { try { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, conf); String ret = proxy.aMethod(); return ret; @@ -477,7 +483,7 @@ public class TestDoAsEffectiveUser { @Override public String run() throws Exception { try { - proxy = (TestProtocol) RPC.getProxy(TestProtocol.class, + proxy = RPC.getProxy(TestProtocol.class, TestProtocol.versionID, addr, newConf); String ret = proxy.aMethod(); return ret;