Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Tue Aug 19 23:49:39 2014 @@ -26,7 +26,6 @@ import static org.apache.hadoop.hdfs.pro import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -38,11 +37,13 @@ import org.apache.hadoop.HadoopIllegalAr import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; @@ -205,26 +206,62 @@ public class HAUtil { /** * @return true if the given nameNodeUri appears to be a logical URI. - * This is the case if there is a failover proxy provider configured - * for it in the given configuration. */ public static boolean isLogicalUri( Configuration conf, URI nameNodeUri) { String host = nameNodeUri.getHost(); + // A logical name must be one of the service IDs. + return DFSUtil.getNameServiceIds(conf).contains(host); + } + + /** + * Check whether the client has a failover proxy provider configured + * for the namenode/nameservice. + * + * @param conf Configuration + * @param nameNodeUri The URI of namenode + * @return true if failover is configured. + */ + public static boolean isClientFailoverConfigured( + Configuration conf, URI nameNodeUri) { + String host = nameNodeUri.getHost(); String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + host; return conf.get(configKey) != null; } /** + * Check whether logical URI is needed for the namenode and + * the corresponding failover proxy provider in the config. + * + * @param conf Configuration + * @param nameNodeUri The URI of namenode + * @return true if logical URI is needed. false, if not needed. + * @throws IOException most likely due to misconfiguration. + */ + public static boolean useLogicalUri(Configuration conf, URI nameNodeUri) + throws IOException { + // Create the proxy provider. Actual proxy is not created. + AbstractNNFailoverProxyProvider<ClientProtocol> provider = NameNodeProxies + .createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class, + false); + + // No need to use logical URI since failover is not configured. + if (provider == null) { + return false; + } + // Check whether the failover proxy provider uses logical URI. + return provider.useLogicalURI(); + } + + /** * Parse the file system URI out of the provided token. */ - public static URI getServiceUriFromToken(final String scheme, - Token<?> token) { + public static URI getServiceUriFromToken(final String scheme, Token<?> token) { String tokStr = token.getService().toString(); - - if (tokStr.startsWith(HA_DT_SERVICE_PREFIX)) { - tokStr = tokStr.replaceFirst(HA_DT_SERVICE_PREFIX, ""); + final String prefix = buildTokenServicePrefixForLogicalUri(scheme); + if (tokStr.startsWith(prefix)) { + tokStr = tokStr.replaceFirst(prefix, ""); } return URI.create(scheme + "://" + tokStr); } @@ -233,10 +270,13 @@ public class HAUtil { * Get the service name used in the delegation token for the given logical * HA service. * @param uri the logical URI of the cluster + * @param scheme the scheme of the corresponding FileSystem * @return the service name */ - public static Text buildTokenServiceForLogicalUri(URI uri) { - return new Text(HA_DT_SERVICE_PREFIX + uri.getHost()); + public static Text buildTokenServiceForLogicalUri(final URI uri, + final String scheme) { + return new Text(buildTokenServicePrefixForLogicalUri(scheme) + + uri.getHost()); } /** @@ -246,7 +286,11 @@ public class HAUtil { public static boolean isTokenForLogicalUri(Token<?> token) { return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX); } - + + public static String buildTokenServicePrefixForLogicalUri(String scheme) { + return HA_DT_SERVICE_PREFIX + scheme + ":"; + } + /** * Locate a delegation token associated with the given HA cluster URI, and if * one is found, clone it to also represent the underlying namenode address. @@ -258,7 +302,9 @@ public class HAUtil { public static void cloneDelegationTokenForLogicalUri( UserGroupInformation ugi, URI haUri, Collection<InetSocketAddress> nnAddrs) { - Text haService = HAUtil.buildTokenServiceForLogicalUri(haUri); + // this cloning logic is only used by hdfs + Text haService = HAUtil.buildTokenServiceForLogicalUri(haUri, + HdfsConstants.HDFS_URI_SCHEME); Token<DelegationTokenIdentifier> haToken = tokenSelector.selectToken(haService, ugi.getTokens()); if (haToken != null) { @@ -269,8 +315,9 @@ public class HAUtil { Token<DelegationTokenIdentifier> specificToken = new Token.PrivateToken<DelegationTokenIdentifier>(haToken); SecurityUtil.setTokenService(specificToken, singleNNAddr); - Text alias = - new Text(HA_DT_SERVICE_PREFIX + "//" + specificToken.getService()); + Text alias = new Text( + buildTokenServicePrefixForLogicalUri(HdfsConstants.HDFS_URI_SCHEME) + + "//" + specificToken.getService()); ugi.addToken(alias, specificToken); LOG.debug("Mapped HA service delegation token for logical URI " + haUri + " to namenode " + singleNNAddr); @@ -314,18 +361,42 @@ public class HAUtil { */ public static List<ClientProtocol> getProxiesForAllNameNodesInNameservice( Configuration conf, String nsId) throws IOException { + List<ProxyAndInfo<ClientProtocol>> proxies = + getProxiesForAllNameNodesInNameservice(conf, nsId, ClientProtocol.class); + + List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>( + proxies.size()); + for (ProxyAndInfo<ClientProtocol> proxy : proxies) { + namenodes.add(proxy.getProxy()); + } + return namenodes; + } + + /** + * Get an RPC proxy for each NN in an HA nameservice. Used when a given RPC + * call should be made on every NN in an HA nameservice, not just the active. + * + * @param conf configuration + * @param nsId the nameservice to get all of the proxies for. + * @param xface the protocol class. + * @return a list of RPC proxies for each NN in the nameservice. + * @throws IOException in the event of error. + */ + public static <T> List<ProxyAndInfo<T>> getProxiesForAllNameNodesInNameservice( + Configuration conf, String nsId, Class<T> xface) throws IOException { Map<String, InetSocketAddress> nnAddresses = DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); - List<ClientProtocol> namenodes = new ArrayList<ClientProtocol>(); + List<ProxyAndInfo<T>> proxies = new ArrayList<ProxyAndInfo<T>>( + nnAddresses.size()); for (InetSocketAddress nnAddress : nnAddresses.values()) { - NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null; + NameNodeProxies.ProxyAndInfo<T> proxyInfo = null; proxyInfo = NameNodeProxies.createNonHAProxy(conf, - nnAddress, ClientProtocol.class, + nnAddress, xface, UserGroupInformation.getCurrentUser(), false); - namenodes.add(proxyInfo.getProxy()); + proxies.add(proxyInfo); } - return namenodes; + return proxies; } /**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Tue Aug 19 23:49:39 2014 @@ -32,6 +32,8 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.ipc.RefreshCallQueueProtocol; +import org.apache.hadoop.ipc.GenericRefreshProtocol; /** * {@link PolicyProvider} for HDFS protocols. @@ -64,7 +66,13 @@ public class HDFSPolicyProvider extends RefreshUserMappingsProtocol.class), new Service( CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GET_USER_MAPPINGS, - GetUserMappingsProtocol.class) + GetUserMappingsProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE, + RefreshCallQueueProtocol.class), + new Service( + CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH, + GenericRefreshProtocol.class) }; @Override Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java Tue Aug 19 23:49:39 2014 @@ -281,7 +281,7 @@ class LeaseRenewer { && Time.now() - emptyTime > gracePeriod; } - synchronized void put(final String src, final DFSOutputStream out, + synchronized void put(final long inodeId, final DFSOutputStream out, final DFSClient dfsc) { if (dfsc.isClientRunning()) { if (!isRunning() || isRenewerExpired()) { @@ -319,7 +319,7 @@ class LeaseRenewer { }); daemon.start(); } - dfsc.putFileBeingWritten(src, out); + dfsc.putFileBeingWritten(inodeId, out); emptyTime = Long.MAX_VALUE; } } @@ -330,8 +330,8 @@ class LeaseRenewer { } /** Close a file. */ - void closeFile(final String src, final DFSClient dfsc) { - dfsc.removeFileBeingWritten(src); + void closeFile(final long inodeId, final DFSClient dfsc) { + dfsc.removeFileBeingWritten(inodeId); synchronized(this) { if (dfsc.isFilesBeingWrittenEmpty()) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Tue Aug 19 23:49:39 2014 @@ -50,6 +50,8 @@ import org.apache.hadoop.hdfs.protocolPB import org.apache.hadoop.hdfs.protocolPB.JournalProtocolTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.WrappedFailoverProxyProvider; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; @@ -104,10 +106,13 @@ public class NameNodeProxies { public static class ProxyAndInfo<PROXYTYPE> { private final PROXYTYPE proxy; private final Text dtService; + private final InetSocketAddress address; - public ProxyAndInfo(PROXYTYPE proxy, Text dtService) { + public ProxyAndInfo(PROXYTYPE proxy, Text dtService, + InetSocketAddress address) { this.proxy = proxy; this.dtService = dtService; + this.address = address; } public PROXYTYPE getProxy() { @@ -117,6 +122,10 @@ public class NameNodeProxies { public Text getDelegationTokenService() { return dtService; } + + public InetSocketAddress getAddress() { + return address; + } } /** @@ -136,27 +145,32 @@ public class NameNodeProxies { @SuppressWarnings("unchecked") public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { - Class<FailoverProxyProvider<T>> failoverProxyProviderClass = - getFailoverProxyProviderClass(conf, nameNodeUri, xface); + AbstractNNFailoverProxyProvider<T> failoverProxyProvider = + createFailoverProxyProvider(conf, nameNodeUri, xface, true); - if (failoverProxyProviderClass == null) { + if (failoverProxyProvider == null) { // Non-HA case return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface, UserGroupInformation.getCurrentUser(), true); } else { // HA case - FailoverProxyProvider<T> failoverProxyProvider = NameNodeProxies - .createFailoverProxyProvider(conf, failoverProxyProviderClass, xface, - nameNodeUri); Conf config = new Conf(conf); T proxy = (T) RetryProxy.create(xface, failoverProxyProvider, RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.maxRetryAttempts, config.failoverSleepBaseMillis, config.failoverSleepMaxMillis)); - - Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); - return new ProxyAndInfo<T>(proxy, dtService); + + Text dtService; + if (failoverProxyProvider.useLogicalURI()) { + dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, + HdfsConstants.HDFS_URI_SCHEME); + } else { + dtService = SecurityUtil.buildTokenService( + NameNode.getAddress(nameNodeUri)); + } + return new ProxyAndInfo<T>(proxy, dtService, + NameNode.getAddress(nameNodeUri)); } } @@ -183,12 +197,10 @@ public class NameNodeProxies { Configuration config, URI nameNodeUri, Class<T> xface, int numResponseToDrop) throws IOException { Preconditions.checkArgument(numResponseToDrop > 0); - Class<FailoverProxyProvider<T>> failoverProxyProviderClass = - getFailoverProxyProviderClass(config, nameNodeUri, xface); - if (failoverProxyProviderClass != null) { // HA case - FailoverProxyProvider<T> failoverProxyProvider = - createFailoverProxyProvider(config, failoverProxyProviderClass, - xface, nameNodeUri); + AbstractNNFailoverProxyProvider<T> failoverProxyProvider = + createFailoverProxyProvider(config, nameNodeUri, xface, true); + + if (failoverProxyProvider != null) { // HA case int delay = config.getInt( DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY, DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT); @@ -211,8 +223,16 @@ public class NameNodeProxies { T proxy = (T) Proxy.newProxyInstance( failoverProxyProvider.getInterface().getClassLoader(), new Class[] { xface }, dummyHandler); - Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri); - return new ProxyAndInfo<T>(proxy, dtService); + Text dtService; + if (failoverProxyProvider.useLogicalURI()) { + dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri, + HdfsConstants.HDFS_URI_SCHEME); + } else { + dtService = SecurityUtil.buildTokenService( + NameNode.getAddress(nameNodeUri)); + } + return new ProxyAndInfo<T>(proxy, dtService, + NameNode.getAddress(nameNodeUri)); } else { LOG.warn("Currently creating proxy using " + "LossyRetryInvocationHandler requires NN HA setup"); @@ -265,7 +285,7 @@ public class NameNodeProxies { throw new IllegalStateException(message); } - return new ProxyAndInfo<T>(proxy, dtService); + return new ProxyAndInfo<T>(proxy, dtService, nnAddr); } private static JournalProtocol createNNProxyWithJournalProtocol( @@ -315,19 +335,18 @@ public class NameNodeProxies { address, conf, ugi, NamenodeProtocolPB.class, 0); if (withRetries) { // create the proxy with retries RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200, - TimeUnit.MILLISECONDS); - Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap - = new HashMap<Class<? extends Exception>, RetryPolicy>(); - RetryPolicy methodPolicy = RetryPolicies.retryByException(timeoutPolicy, - exceptionToPolicyMap); - Map<String, RetryPolicy> methodNameToPolicyMap - = new HashMap<String, RetryPolicy>(); - methodNameToPolicyMap.put("getBlocks", methodPolicy); - methodNameToPolicyMap.put("getAccessKeys", methodPolicy); - proxy = (NamenodeProtocolPB) RetryProxy.create(NamenodeProtocolPB.class, - proxy, methodNameToPolicyMap); + TimeUnit.MILLISECONDS); + Map<String, RetryPolicy> methodNameToPolicyMap + = new HashMap<String, RetryPolicy>(); + methodNameToPolicyMap.put("getBlocks", timeoutPolicy); + methodNameToPolicyMap.put("getAccessKeys", timeoutPolicy); + NamenodeProtocol translatorProxy = + new NamenodeProtocolTranslatorPB(proxy); + return (NamenodeProtocol) RetryProxy.create( + NamenodeProtocol.class, translatorProxy, methodNameToPolicyMap); + } else { + return new NamenodeProtocolTranslatorPB(proxy); } - return new NamenodeProtocolTranslatorPB(proxy); } private static ClientProtocol createNNProxyWithClientProtocol( @@ -361,29 +380,27 @@ public class NameNodeProxies { = new HashMap<Class<? extends Exception>, RetryPolicy>(); remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy); - - Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap - = new HashMap<Class<? extends Exception>, RetryPolicy>(); - exceptionToPolicyMap.put(RemoteException.class, RetryPolicies - .retryByRemoteException(defaultPolicy, - remoteExceptionToPolicyMap)); - RetryPolicy methodPolicy = RetryPolicies.retryByException( - defaultPolicy, exceptionToPolicyMap); + + RetryPolicy methodPolicy = RetryPolicies.retryByRemoteException( + defaultPolicy, remoteExceptionToPolicyMap); Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<String, RetryPolicy>(); methodNameToPolicyMap.put("create", methodPolicy); - - proxy = (ClientNamenodeProtocolPB) RetryProxy.create( - ClientNamenodeProtocolPB.class, - new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>( - ClientNamenodeProtocolPB.class, proxy), + + ClientProtocol translatorProxy = + new ClientNamenodeProtocolTranslatorPB(proxy); + return (ClientProtocol) RetryProxy.create( + ClientProtocol.class, + new DefaultFailoverProxyProvider<ClientProtocol>( + ClientProtocol.class, translatorProxy), methodNameToPolicyMap, defaultPolicy); + } else { + return new ClientNamenodeProtocolTranslatorPB(proxy); } - return new ClientNamenodeProtocolTranslatorPB(proxy); } - + private static Object createNameNodeProxy(InetSocketAddress address, Configuration conf, UserGroupInformation ugi, Class<?> xface, int rpcTimeout) throws IOException { @@ -396,7 +413,7 @@ public class NameNodeProxies { /** Gets the configured Failover proxy provider's class */ @VisibleForTesting public static <T> Class<FailoverProxyProvider<T>> getFailoverProxyProviderClass( - Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { + Configuration conf, URI nameNodeUri) throws IOException { if (nameNodeUri == null) { return null; } @@ -408,17 +425,6 @@ public class NameNodeProxies { @SuppressWarnings("unchecked") Class<FailoverProxyProvider<T>> ret = (Class<FailoverProxyProvider<T>>) conf .getClass(configKey, null, FailoverProxyProvider.class); - if (ret != null) { - // If we found a proxy provider, then this URI should be a logical NN. - // Given that, it shouldn't have a non-default port number. - int port = nameNodeUri.getPort(); - if (port > 0 && port != NameNode.DEFAULT_PORT) { - throw new IOException("Port " + port + " specified in URI " - + nameNodeUri + " but host '" + host - + "' is a logical (HA) namenode" - + " and does not use port information."); - } - } return ret; } catch (RuntimeException e) { if (e.getCause() instanceof ClassNotFoundException) { @@ -433,18 +439,33 @@ public class NameNodeProxies { /** Creates the Failover proxy provider instance*/ @VisibleForTesting - public static <T> FailoverProxyProvider<T> createFailoverProxyProvider( - Configuration conf, Class<FailoverProxyProvider<T>> failoverProxyProviderClass, - Class<T> xface, URI nameNodeUri) throws IOException { + public static <T> AbstractNNFailoverProxyProvider<T> createFailoverProxyProvider( + Configuration conf, URI nameNodeUri, Class<T> xface, boolean checkPort) + throws IOException { + Class<FailoverProxyProvider<T>> failoverProxyProviderClass = null; + AbstractNNFailoverProxyProvider<T> providerNN; Preconditions.checkArgument( xface.isAssignableFrom(NamenodeProtocols.class), "Interface %s is not a NameNode protocol", xface); try { + // Obtain the class of the proxy provider + failoverProxyProviderClass = getFailoverProxyProviderClass(conf, + nameNodeUri); + if (failoverProxyProviderClass == null) { + return null; + } + // Create a proxy provider instance. Constructor<FailoverProxyProvider<T>> ctor = failoverProxyProviderClass .getConstructor(Configuration.class, URI.class, Class.class); FailoverProxyProvider<T> provider = ctor.newInstance(conf, nameNodeUri, xface); - return provider; + + // If the proxy provider is of an old implementation, wrap it. + if (!(provider instanceof AbstractNNFailoverProxyProvider)) { + providerNN = new WrappedFailoverProxyProvider<T>(provider); + } else { + providerNN = (AbstractNNFailoverProxyProvider<T>)provider; + } } catch (Exception e) { String message = "Couldn't create proxy provider " + failoverProxyProviderClass; if (LOG.isDebugEnabled()) { @@ -456,6 +477,20 @@ public class NameNodeProxies { throw new IOException(message, e); } } + + // Check the port in the URI, if it is logical. + if (checkPort && providerNN.useLogicalURI()) { + int port = nameNodeUri.getPort(); + if (port > 0 && port != NameNode.DEFAULT_PORT) { + // Throwing here without any cleanup is fine since we have not + // actually created the underlying proxies yet. + throw new IOException("Port " + port + " specified in URI " + + nameNodeUri + " but host '" + nameNodeUri.getHost() + + "' is a logical (HA) namenode" + + " and does not use port information."); + } + } + return providerNN; } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Aug 19 23:49:39 2014 @@ -26,6 +26,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.util.EnumSet; +import java.util.UUID; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -133,9 +134,22 @@ public class RemoteBlockReader2 impleme public synchronized int read(byte[] buf, int off, int len) throws IOException { + UUID randomId = null; + if (LOG.isTraceEnabled()) { + randomId = UUID.randomUUID(); + LOG.trace(String.format("Starting read #%s file %s from datanode %s", + randomId.toString(), this.filename, + this.datanodeID.getHostName())); + } + if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { readNextPacket(); } + + if (LOG.isTraceEnabled()) { + LOG.trace(String.format("Finishing read #" + randomId)); + } + if (curDataSlice.remaining() == 0) { // we're at EOF now return -1; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java Tue Aug 19 23:49:39 2014 @@ -21,15 +21,21 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; public interface RemotePeerFactory { /** * @param addr The address to connect to. - * + * @param blockToken Token used during optional SASL negotiation + * @param datanodeId ID of destination DataNode * @return A new Peer connected to the address. * * @throws IOException If there was an error connecting or creating * the remote socket, encrypted stream, etc. */ - Peer newConnectedPeer(InetSocketAddress addr) throws IOException; + Peer newConnectedPeer(InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StorageType.java Tue Aug 19 23:49:39 2014 @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs; +import java.util.Arrays; +import java.util.List; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -32,4 +35,11 @@ public enum StorageType { SSD; public static final StorageType DEFAULT = DISK; + public static final StorageType[] EMPTY_ARRAY = {}; + + private static final StorageType[] VALUES = values(); + + public static List<StorageType> asList() { + return Arrays.asList(VALUES); + } } \ No newline at end of file Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java Tue Aug 19 23:49:39 2014 @@ -19,9 +19,7 @@ package org.apache.hadoop.hdfs.net; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.net.unix.DomainSocket; import java.io.InputStream; @@ -51,11 +49,8 @@ public class EncryptedPeer implements Pe */ private final ReadableByteChannel channel; - public EncryptedPeer(Peer enclosedPeer, DataEncryptionKey key) - throws IOException { + public EncryptedPeer(Peer enclosedPeer, IOStreamPair ios) { this.enclosedPeer = enclosedPeer; - IOStreamPair ios = DataTransferEncryptor.getEncryptedStreams( - enclosedPeer.getOutputStream(), enclosedPeer.getInputStream(), key); this.in = ios.in; this.out = ios.out; this.channel = ios.in instanceof ReadableByteChannel ? Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java Tue Aug 19 23:49:39 2014 @@ -28,10 +28,14 @@ import java.nio.channels.SocketChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.token.Token; @InterfaceAudience.Private public class TcpPeerServer implements PeerServer { @@ -74,15 +78,16 @@ public class TcpPeerServer implements Pe } } - public static Peer peerFromSocketAndKey(Socket s, - DataEncryptionKey key) throws IOException { + public static Peer peerFromSocketAndKey( + SaslDataTransferClient saslClient, Socket s, + DataEncryptionKeyFactory keyFactory, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) + throws IOException { Peer peer = null; boolean success = false; try { - peer = peerFromSocket(s); - if (key != null) { - peer = new EncryptedPeer(peer, key); - } + peer = peerFromSocket(s); + peer = saslClient.peerSend(peer, keyFactory, blockToken, datanodeId); success = true; return peer; } finally { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Tue Aug 19 23:49:39 2014 @@ -50,6 +50,9 @@ public class Block implements Writable, public static final Pattern metaFilePattern = Pattern .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION + "$"); + public static final Pattern metaOrBlockFilePattern = Pattern + .compile(BLOCK_FILE_PREFIX + "(-??\\d++)(_(\\d++)\\" + METADATA_EXTENSION + + ")?$"); public static boolean isBlockFilename(File f) { String name = f.getName(); @@ -65,6 +68,11 @@ public class Block implements Writable, return metaFilePattern.matcher(name).matches(); } + public static File metaToBlockFile(File metaFile) { + return new File(metaFile.getParent(), metaFile.getName().substring( + 0, metaFile.getName().lastIndexOf('_'))); + } + /** * Get generation stamp from the name of the metafile name */ @@ -75,10 +83,10 @@ public class Block implements Writable, } /** - * Get the blockId from the name of the metafile name + * Get the blockId from the name of the meta or block file */ - public static long getBlockId(String metaFile) { - Matcher m = metaFilePattern.matcher(metaFile); + public static long getBlockId(String metaOrBlockFile) { + Matcher m = metaOrBlockFilePattern.matcher(metaOrBlockFile); return m.matches() ? Long.parseLong(m.group(1)) : 0; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java Tue Aug 19 23:49:39 2014 @@ -23,6 +23,10 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.ipc.RemoteException; + +import com.google.common.base.Preconditions; /** * CacheDirectiveIterator is a remote iterator that iterates cache directives. @@ -33,7 +37,7 @@ import org.apache.hadoop.fs.BatchedRemot public class CacheDirectiveIterator extends BatchedRemoteIterator<Long, CacheDirectiveEntry> { - private final CacheDirectiveInfo filter; + private CacheDirectiveInfo filter; private final ClientProtocol namenode; public CacheDirectiveIterator(ClientProtocol namenode, @@ -43,10 +47,72 @@ public class CacheDirectiveIterator this.filter = filter; } + private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) { + CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder(filter); + builder.setId(null); + return builder.build(); + } + + /** + * Used for compatibility when communicating with a server version that + * does not support filtering directives by ID. + */ + private static class SingleEntry implements + BatchedEntries<CacheDirectiveEntry> { + + private final CacheDirectiveEntry entry; + + public SingleEntry(final CacheDirectiveEntry entry) { + this.entry = entry; + } + + @Override + public CacheDirectiveEntry get(int i) { + if (i > 0) { + return null; + } + return entry; + } + + @Override + public int size() { + return 1; + } + + @Override + public boolean hasMore() { + return false; + } + } + @Override public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey) throws IOException { - return namenode.listCacheDirectives(prevKey, filter); + BatchedEntries<CacheDirectiveEntry> entries = null; + try { + entries = namenode.listCacheDirectives(prevKey, filter); + } catch (IOException e) { + if (e.getMessage().contains("Filtering by ID is unsupported")) { + // Retry case for old servers, do the filtering client-side + long id = filter.getId(); + filter = removeIdFromFilter(filter); + // Using id - 1 as prevId should get us a window containing the id + // This is somewhat brittle, since it depends on directives being + // returned in order of ascending ID. + entries = namenode.listCacheDirectives(id - 1, filter); + for (int i=0; i<entries.size(); i++) { + CacheDirectiveEntry entry = entries.get(i); + if (entry.getInfo().getId().equals((Long)id)) { + return new SingleEntry(entry); + } + } + throw new RemoteException(InvalidRequestException.class.getName(), + "Did not find requested id " + id); + } + throw e; + } + Preconditions.checkNotNull(entries); + return entries; } @Override Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Tue Aug 19 23:49:39 2014 @@ -110,8 +110,9 @@ public interface ClientDatanodeProtocol /** * Retrieves volume location information about a list of blocks on a datanode. - * This is in the form of an opaque {@link VolumeId} for each configured - * data directory, which is not guaranteed to be the same across DN restarts. + * This is in the form of an opaque {@link org.apache.hadoop.fs.VolumeId} + * for each configured data directory, which is not guaranteed to be + * the same across DN restarts. * * @param blockPoolId the pool to query * @param blockIds Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Aug 19 23:49:39 2014 @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; @@ -31,12 +32,14 @@ import org.apache.hadoop.fs.FileAlreadyE import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.Options; -import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; @@ -45,6 +48,7 @@ import org.apache.hadoop.hdfs.security.t import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.AtMostOnce; @@ -268,7 +272,7 @@ public interface ClientProtocol { /** * Set Owner of a path (i.e. a file or a directory). * The parameters username and groupname cannot both be null. - * @param src + * @param src file path * @param username If it is null, the original username remains unchanged. * @param groupname If it is null, the original groupname remains unchanged. * @@ -290,13 +294,20 @@ public interface ClientProtocol { * file. * Any partial writes to the block will be discarded. * + * @param b Block to abandon + * @param fileId The id of the file where the block resides. Older clients + * will pass GRANDFATHER_INODE_ID here. + * @param src The path of the file where the block resides. + * @param holder Lease holder. + * * @throws AccessControlException If access is denied * @throws FileNotFoundException file <code>src</code> is not found * @throws UnresolvedLinkException If <code>src</code> contains a symlink * @throws IOException If an I/O error occurred */ @Idempotent - public void abandonBlock(ExtendedBlock b, String src, String holder) + public void abandonBlock(ExtendedBlock b, long fileId, + String src, String holder) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException; @@ -344,6 +355,7 @@ public interface ClientProtocol { * Get a datanode for an existing pipeline. * * @param src the file being written + * @param fileId the ID of the file being written * @param blk the block being written * @param existings the existing nodes in the pipeline * @param excludes the excluded nodes @@ -359,8 +371,10 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk, - final DatanodeInfo[] existings, final String[] existingStorageIDs, + public LocatedBlock getAdditionalDatanode(final String src, + final long fileId, final ExtendedBlock blk, + final DatanodeInfo[] existings, + final String[] existingStorageIDs, final DatanodeInfo[] excludes, final int numAdditionalNodes, final String clientName ) throws AccessControlException, FileNotFoundException, @@ -643,6 +657,13 @@ public interface ClientProtocol { throws IOException; /** + * Get a report on the current datanode storages. + */ + @Idempotent + public DatanodeStorageReport[] getDatanodeStorageReport( + HdfsConstants.DatanodeReportType type) throws IOException; + + /** * Get the block size for the given file. * @param filename The name of the file * @return The number of bytes in each block @@ -896,6 +917,8 @@ public interface ClientProtocol { * Write all metadata for this file into persistent storage. * The file must be currently open for writing. * @param src The string representation of the path + * @param inodeId The inode ID, or GRANDFATHER_INODE_ID if the client is + * too old to support fsync with inode IDs. * @param client The string representation of the client * @param lastBlockLength The length of the last block (under construction) * to be reported to NameNode @@ -905,7 +928,8 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - public void fsync(String src, String client, long lastBlockLength) + public void fsync(String src, long inodeId, String client, + long lastBlockLength) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException; @@ -1126,7 +1150,6 @@ public interface ClientProtocol { /** * Modify a CacheDirective in the CacheManager. * - * @return directive The directive to modify. Must contain a directive ID. * @param flags {@link CacheFlag}s to use for this operation. * @throws IOException if the directive could not be modified */ @@ -1242,4 +1265,85 @@ public interface ClientProtocol { */ @Idempotent public AclStatus getAclStatus(String src) throws IOException; + + /** + * Set xattr of a file or directory. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + * <p/> + * Refer to the HDFS extended attributes user documentation for details. + * + * @param src file or directory + * @param xAttr <code>XAttr</code> to set + * @param flag set flag + * @throws IOException + */ + @AtMostOnce + public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) + throws IOException; + + /** + * Get xattrs of a file or directory. Values in xAttrs parameter are ignored. + * If xAttrs is null or empty, this is the same as getting all xattrs of the + * file or directory. Only those xattrs for which the logged-in user has + * permissions to view are returned. + * <p/> + * Refer to the HDFS extended attributes user documentation for details. + * + * @param src file or directory + * @param xAttrs xAttrs to get + * @return List<XAttr> <code>XAttr</code> list + * @throws IOException + */ + @Idempotent + public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) + throws IOException; + + /** + * List the xattrs names for a file or directory. + * Only the xattr names for which the logged in user has the permissions to + * access will be returned. + * <p/> + * Refer to the HDFS extended attributes user documentation for details. + * + * @param src file or directory + * @param xAttrs xAttrs to get + * @return List<XAttr> <code>XAttr</code> list + * @throws IOException + */ + @Idempotent + public List<XAttr> listXAttrs(String src) + throws IOException; + + /** + * Remove xattr of a file or directory.Value in xAttr parameter is ignored. + * The name must be prefixed with the namespace followed by ".". For example, + * "user.attr". + * <p/> + * Refer to the HDFS extended attributes user documentation for details. + * + * @param src file or directory + * @param xAttr <code>XAttr</code> to remove + * @throws IOException + */ + @AtMostOnce + public void removeXAttr(String src, XAttr xAttr) throws IOException; + + /** + * Checks if the user can access a path. The mode specifies which access + * checks to perform. If the requested permissions are granted, then the + * method returns normally. If access is denied, then the method throws an + * {@link AccessControlException}. + * In general, applications should avoid using this method, due to the risk of + * time-of-check/time-of-use race conditions. The permissions on a file may + * change immediately after the access call returns. + * + * @param path Path to check + * @param mode type of access to check + * @throws AccessControlException if access is denied + * @throws FileNotFoundException if the path does not exist + * @throws IOException see specific implementation + */ + @Idempotent + public void checkAccess(String path, FsAction mode) throws IOException; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Aug 19 23:49:39 2014 @@ -29,6 +29,8 @@ import org.apache.hadoop.util.StringUtil import org.apache.hadoop.util.Time; import java.util.Date; +import java.util.LinkedList; +import java.util.List; import static org.apache.hadoop.hdfs.DFSUtil.percent2String; @@ -50,6 +52,8 @@ public class DatanodeInfo extends Datano private int xceiverCount; private String location = NetworkTopology.DEFAULT_RACK; private String softwareVersion; + private List<String> dependentHostNames = new LinkedList<String>(); + // Datanode administrative states public enum AdminStates { @@ -274,6 +278,21 @@ public class DatanodeInfo extends Datano public synchronized void setNetworkLocation(String location) { this.location = NodeBase.normalize(location); } + + /** Add a hostname to a list of network dependencies */ + public void addDependentHostName(String hostname) { + dependentHostNames.add(hostname); + } + + /** List of Network dependencies */ + public List<String> getDependentHostNames() { + return dependentHostNames; + } + + /** Sets the network dependencies */ + public void setDependentHostNames(List<String> dependencyList) { + dependentHostNames = dependencyList; + } /** A formatted string for reporting the status of the DataNode. */ public String getDatanodeReport() { @@ -320,7 +339,7 @@ public class DatanodeInfo extends Datano buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n"); buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n"); buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n"); - + buffer.append("Xceivers: "+getXceiverCount()+"\n"); buffer.append("Last contact: "+new Date(lastUpdate)+"\n"); return buffer.toString(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java Tue Aug 19 23:49:39 2014 @@ -112,7 +112,8 @@ public class ExtendedBlock { @Override // Object public int hashCode() { - return block.hashCode(); + int result = 31 + poolId.hashCode(); + return (31 * result + block.hashCode()); } @Override // Object Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Tue Aug 19 23:49:39 2014 @@ -104,7 +104,7 @@ public class HdfsConstants { // type of the datanode report public static enum DatanodeReportType { - ALL, LIVE, DEAD + ALL, LIVE, DEAD, DECOMMISSIONING } // An invalid transaction ID that will never be seen in a real namesystem. @@ -124,7 +124,7 @@ public class HdfsConstants { * of a delgation token, indicating that the URI is a logical (HA) * URI. */ - public static final String HA_DT_SERVICE_PREFIX = "ha-hdfs:"; + public static final String HA_DT_SERVICE_PREFIX = "ha-"; /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Tue Aug 19 23:49:39 2014 @@ -91,7 +91,7 @@ public class HdfsFileStatus { * Get the length of this file, in bytes. * @return the length of this file, in bytes. */ - final public long getLen() { + public final long getLen() { return length; } @@ -99,7 +99,7 @@ public class HdfsFileStatus { * Is this a directory? * @return true if this is a directory */ - final public boolean isDir() { + public final boolean isDir() { return isdir; } @@ -115,7 +115,7 @@ public class HdfsFileStatus { * Get the block size of the file. * @return the number of bytes */ - final public long getBlockSize() { + public final long getBlockSize() { return blocksize; } @@ -123,7 +123,7 @@ public class HdfsFileStatus { * Get the replication factor of a file. * @return the replication factor of a file. */ - final public short getReplication() { + public final short getReplication() { return block_replication; } @@ -131,7 +131,7 @@ public class HdfsFileStatus { * Get the modification time of the file. * @return the modification time of file in milliseconds since January 1, 1970 UTC. */ - final public long getModificationTime() { + public final long getModificationTime() { return modification_time; } @@ -139,7 +139,7 @@ public class HdfsFileStatus { * Get the access time of the file. * @return the access time of file in milliseconds since January 1, 1970 UTC. */ - final public long getAccessTime() { + public final long getAccessTime() { return access_time; } @@ -147,7 +147,7 @@ public class HdfsFileStatus { * Get FsPermission associated with the file. * @return permssion */ - final public FsPermission getPermission() { + public final FsPermission getPermission() { return permission; } @@ -155,7 +155,7 @@ public class HdfsFileStatus { * Get the owner of the file. * @return owner of the file */ - final public String getOwner() { + public final String getOwner() { return owner; } @@ -163,7 +163,7 @@ public class HdfsFileStatus { * Get the group associated with the file. * @return group for the file. */ - final public String getGroup() { + public final String getGroup() { return group; } @@ -171,7 +171,7 @@ public class HdfsFileStatus { * Check if the local name is empty * @return true if the name is empty */ - final public boolean isEmptyLocalName() { + public final boolean isEmptyLocalName() { return path.length == 0; } @@ -179,7 +179,7 @@ public class HdfsFileStatus { * Get the string representation of the local name * @return the local name in string */ - final public String getLocalName() { + public final String getLocalName() { return DFSUtil.bytes2String(path); } @@ -187,7 +187,7 @@ public class HdfsFileStatus { * Get the Java UTF8 representation of the local name * @return the local name in java UTF8 */ - final public byte[] getLocalNameInBytes() { + public final byte[] getLocalNameInBytes() { return path; } @@ -196,7 +196,7 @@ public class HdfsFileStatus { * @param parent the parent path * @return the full path in string */ - final public String getFullName(final String parent) { + public final String getFullName(final String parent) { if (isEmptyLocalName()) { return parent; } @@ -214,7 +214,7 @@ public class HdfsFileStatus { * @param parent the parent path * @return the full path */ - final public Path getFullPath(final Path parent) { + public final Path getFullPath(final Path parent) { if (isEmptyLocalName()) { return parent; } @@ -226,23 +226,23 @@ public class HdfsFileStatus { * Get the string representation of the symlink. * @return the symlink as a string. */ - final public String getSymlink() { + public final String getSymlink() { return DFSUtil.bytes2String(symlink); } - final public byte[] getSymlinkInBytes() { + public final byte[] getSymlinkInBytes() { return symlink; } - final public long getFileId() { + public final long getFileId() { return fileId; } - final public int getChildrenNum() { + public final int getChildrenNum() { return childrenNum; } - final public FileStatus makeQualified(URI defaultUri, Path path) { + public final FileStatus makeQualified(URI defaultUri, Path path) { return new FileStatus(getLen(), isDir(), getReplication(), getBlockSize(), getModificationTime(), getAccessTime(), Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java Tue Aug 19 23:49:39 2014 @@ -67,7 +67,7 @@ public class HdfsLocatedFileStatus exten return locations; } - final public LocatedFileStatus makeQualifiedLocated(URI defaultUri, + public final LocatedFileStatus makeQualifiedLocated(URI defaultUri, Path path) { return new LocatedFileStatus(getLen(), isDir(), getReplication(), getBlockSize(), getModificationTime(), Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Tue Aug 19 23:49:39 2014 @@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.security.t import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.security.token.Token; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java Tue Aug 19 23:49:39 2014 @@ -23,14 +23,14 @@ import java.util.List; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo; + +import com.google.common.base.Objects; /** * This class represents to end users the difference between two snapshots of * the same directory, or the difference between a snapshot of the directory and - * its current state. Instead of capturing all the details of the diff, which - * is stored in {@link SnapshotDiffInfo}, this class only lists where the - * changes happened and their types. + * its current state. Instead of capturing all the details of the diff, this + * class only lists where the changes happened and their types. */ public class SnapshotDiffReport { private final static String LINE_SEPARATOR = System.getProperty( @@ -79,43 +79,64 @@ public class SnapshotDiffReport { /** The type of the difference. */ private final DiffType type; /** - * The relative path (related to the snapshot root) of the file/directory - * where changes have happened + * The relative path (related to the snapshot root) of 1) the file/directory + * where changes have happened, or 2) the source file/dir of a rename op. */ - private final byte[] relativePath; + private final byte[] sourcePath; + private final byte[] targetPath; + + public DiffReportEntry(DiffType type, byte[] sourcePath) { + this(type, sourcePath, null); + } + + public DiffReportEntry(DiffType type, byte[][] sourcePathComponents) { + this(type, sourcePathComponents, null); + } - public DiffReportEntry(DiffType type, byte[] path) { + public DiffReportEntry(DiffType type, byte[] sourcePath, byte[] targetPath) { this.type = type; - this.relativePath = path; + this.sourcePath = sourcePath; + this.targetPath = targetPath; } - public DiffReportEntry(DiffType type, byte[][] pathComponents) { + public DiffReportEntry(DiffType type, byte[][] sourcePathComponents, + byte[][] targetPathComponents) { this.type = type; - this.relativePath = DFSUtil.byteArray2bytes(pathComponents); + this.sourcePath = DFSUtil.byteArray2bytes(sourcePathComponents); + this.targetPath = targetPathComponents == null ? null : DFSUtil + .byteArray2bytes(targetPathComponents); } @Override public String toString() { - return type.getLabel() + "\t" + getRelativePathString(); + String str = type.getLabel() + "\t" + getPathString(sourcePath); + if (type == DiffType.RENAME) { + str += " -> " + getPathString(targetPath); + } + return str; } public DiffType getType() { return type; } - public String getRelativePathString() { - String path = DFSUtil.bytes2String(relativePath); - if (path.isEmpty()) { + static String getPathString(byte[] path) { + String pathStr = DFSUtil.bytes2String(path); + if (pathStr.isEmpty()) { return Path.CUR_DIR; } else { - return Path.CUR_DIR + Path.SEPARATOR + path; + return Path.CUR_DIR + Path.SEPARATOR + pathStr; } } - public byte[] getRelativePath() { - return relativePath; + public byte[] getSourcePath() { + return sourcePath; } - + + public byte[] getTargetPath() { + return targetPath; + } + @Override public boolean equals(Object other) { if (this == other) { @@ -124,14 +145,15 @@ public class SnapshotDiffReport { if (other != null && other instanceof DiffReportEntry) { DiffReportEntry entry = (DiffReportEntry) other; return type.equals(entry.getType()) - && Arrays.equals(relativePath, entry.getRelativePath()); + && Arrays.equals(sourcePath, entry.getSourcePath()) + && Arrays.equals(targetPath, entry.getTargetPath()); } return false; } @Override public int hashCode() { - return Arrays.hashCode(relativePath); + return Objects.hashCode(getSourcePath(), getTargetPath()); } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Tue Aug 19 23:49:39 2014 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -71,11 +72,20 @@ public interface DataTransferProtocol { /** * Write a block to a datanode pipeline. - * + * The receiver datanode of this call is the next datanode in the pipeline. + * The other downstream datanodes are specified by the targets parameter. + * Note that the receiver {@link DatanodeInfo} is not required in the + * parameter list since the receiver datanode knows its info. However, the + * {@link StorageType} for storing the replica in the receiver datanode is a + * parameter since the receiver datanode may support multiple storage types. + * * @param blk the block being written. + * @param storageType for storing the replica in the receiver datanode. * @param blockToken security token for accessing the block. * @param clientName client's name. - * @param targets target datanodes in the pipeline. + * @param targets other downstream datanodes in the pipeline. + * @param targetStorageTypes target {@link StorageType}s corresponding + * to the target datanodes. * @param source source datanode. * @param stage pipeline stage. * @param pipelineSize the size of the pipeline. @@ -84,9 +94,11 @@ public interface DataTransferProtocol { * @param latestGenerationStamp the latest generation stamp of the block. */ public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -110,7 +122,8 @@ public interface DataTransferProtocol { public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException; + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException; /** * Request short circuit access file descriptors from a DataNode. @@ -136,7 +149,7 @@ public interface DataTransferProtocol { /** * Request a short circuit shared memory area from a DataNode. * - * @pram clientName The name of the client. + * @param clientName The name of the client. */ public void requestShortCircuitShm(String clientName) throws IOException; @@ -148,11 +161,13 @@ public interface DataTransferProtocol { * It is used for balancing purpose. * * @param blk the block being replaced. + * @param storageType the {@link StorageType} for storing the block. * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. */ public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source) throws IOException; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Tue Aug 19 23:49:39 2014 @@ -27,7 +27,7 @@ import java.nio.channels.ReadableByteCha import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.util.DirectBufferPool; +import org.apache.hadoop.util.DirectBufferPool; import org.apache.hadoop.io.IOUtils; import com.google.common.base.Preconditions; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; @@ -121,10 +122,13 @@ public abstract class Receiver implement /** Receive OP_WRITE_BLOCK */ private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList()), + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), @@ -140,10 +144,12 @@ public abstract class Receiver implement private void opTransferBlock(DataInputStream in) throws IOException { final OpTransferBlockProto proto = OpTransferBlockProto.parseFrom(vintPrefixed(in)); + final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), - PBHelper.convert(proto.getTargetsList())); + targets, + PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length)); } /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */ @@ -176,6 +182,7 @@ public abstract class Receiver implement private void opReplaceBlock(DataInputStream in) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in)); replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), + PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getToken()), proto.getDelHint(), PBHelper.convert(proto.getSource())); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; @@ -111,9 +112,11 @@ public class Sender implements DataTrans @Override public void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -130,7 +133,9 @@ public class Sender implements DataTrans OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() .setHeader(header) + .setStorageType(PBHelper.convertStorageType(storageType)) .addAllTargets(PBHelper.convert(targets, 1)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1)) .setStage(toProto(stage)) .setPipelineSize(pipelineSize) .setMinBytesRcvd(minBytesRcvd) @@ -150,12 +155,14 @@ public class Sender implements DataTrans public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken)) .addAllTargets(PBHelper.convert(targets)) + .addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes)) .build(); send(out, Op.TRANSFER_BLOCK, proto); @@ -196,11 +203,13 @@ public class Sender implements DataTrans @Override public void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelper.convertStorageType(storageType)) .setDelHint(delHint) .setSource(PBHelper.convertDatanodeInfo(source)) .build(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Aug 19 23:49:39 2014 @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSnapshotResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteSnapshotRequestProto; @@ -93,6 +94,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; @@ -171,9 +174,19 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrResponseProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.namenode.INodeId; @@ -302,6 +315,15 @@ public class ClientNamenodeProtocolServe private static final RemoveAclResponseProto VOID_REMOVEACL_RESPONSE = RemoveAclResponseProto.getDefaultInstance(); + + private static final SetXAttrResponseProto + VOID_SETXATTR_RESPONSE = SetXAttrResponseProto.getDefaultInstance(); + + private static final RemoveXAttrResponseProto + VOID_REMOVEXATTR_RESPONSE = RemoveXAttrResponseProto.getDefaultInstance(); + + private static final CheckAccessResponseProto + VOID_CHECKACCESS_RESPONSE = CheckAccessResponseProto.getDefaultInstance(); /** * Constructor @@ -422,8 +444,8 @@ public class ClientNamenodeProtocolServe public AbandonBlockResponseProto abandonBlock(RpcController controller, AbandonBlockRequestProto req) throws ServiceException { try { - server.abandonBlock(PBHelper.convert(req.getB()), req.getSrc(), - req.getHolder()); + server.abandonBlock(PBHelper.convert(req.getB()), req.getFileId(), + req.getSrc(), req.getHolder()); } catch (IOException e) { throw new ServiceException(e); } @@ -461,7 +483,7 @@ public class ClientNamenodeProtocolServe List<String> existingStorageIDsList = req.getExistingStorageUuidsList(); List<DatanodeInfoProto> excludesList = req.getExcludesList(); LocatedBlock result = server.getAdditionalDatanode(req.getSrc(), - PBHelper.convert(req.getBlk()), + req.getFileId(), PBHelper.convert(req.getBlk()), PBHelper.convert(existingList.toArray( new DatanodeInfoProto[existingList.size()])), existingStorageIDsList.toArray( @@ -641,6 +663,21 @@ public class ClientNamenodeProtocolServe } @Override + public GetDatanodeStorageReportResponseProto getDatanodeStorageReport( + RpcController controller, GetDatanodeStorageReportRequestProto req) + throws ServiceException { + try { + List<DatanodeStorageReportProto> reports = PBHelper.convertDatanodeStorageReports( + server.getDatanodeStorageReport(PBHelper.convert(req.getType()))); + return GetDatanodeStorageReportResponseProto.newBuilder() + .addAllDatanodeStorageReports(reports) + .build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override public GetPreferredBlockSizeResponseProto getPreferredBlockSize( RpcController controller, GetPreferredBlockSizeRequestProto req) throws ServiceException { @@ -819,7 +856,8 @@ public class ClientNamenodeProtocolServe public FsyncResponseProto fsync(RpcController controller, FsyncRequestProto req) throws ServiceException { try { - server.fsync(req.getSrc(), req.getClient(), req.getLastBlockLength()); + server.fsync(req.getSrc(), req.getFileId(), + req.getClient(), req.getLastBlockLength()); return VOID_FSYNC_RESPONSE; } catch (IOException e) { throw new ServiceException(e); @@ -1261,4 +1299,59 @@ public class ClientNamenodeProtocolServe throw new ServiceException(e); } } + + @Override + public SetXAttrResponseProto setXAttr(RpcController controller, + SetXAttrRequestProto req) throws ServiceException { + try { + server.setXAttr(req.getSrc(), PBHelper.convertXAttr(req.getXAttr()), + PBHelper.convert(req.getFlag())); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_SETXATTR_RESPONSE; + } + + @Override + public GetXAttrsResponseProto getXAttrs(RpcController controller, + GetXAttrsRequestProto req) throws ServiceException { + try { + return PBHelper.convertXAttrsResponse(server.getXAttrs(req.getSrc(), + PBHelper.convertXAttrs(req.getXAttrsList()))); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public ListXAttrsResponseProto listXAttrs(RpcController controller, + ListXAttrsRequestProto req) throws ServiceException { + try { + return PBHelper.convertListXAttrsResponse(server.listXAttrs(req.getSrc())); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public RemoveXAttrResponseProto removeXAttr(RpcController controller, + RemoveXAttrRequestProto req) throws ServiceException { + try { + server.removeXAttr(req.getSrc(), PBHelper.convertXAttr(req.getXAttr())); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_REMOVEXATTR_RESPONSE; + } + + @Override + public CheckAccessResponseProto checkAccess(RpcController controller, + CheckAccessRequestProto req) throws ServiceException { + try { + server.checkAccess(req.getPath(), PBHelper.convert(req.getMode())); + } catch (IOException e) { + throw new ServiceException(e); + } + return VOID_CHECKACCESS_RESPONSE; + } }