HDFS-12620. Backporting HDFS-10467 to branch-2. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b60c658b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b60c658b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b60c658b Branch: refs/heads/branch-2 Commit: b60c658b048226fba95c6b62e1a1d541170e20f4 Parents: c954e6b Author: Inigo Goiri <inigo...@apache.org> Authored: Thu Oct 19 17:40:42 2017 -0700 Committer: vrushali <vrush...@apache.org> Committed: Fri Oct 20 11:22:34 2017 -0700 ---------------------------------------------------------------------- .../hadoop-hdfs/src/main/bin/hdfs | 7 + .../federation/metrics/FederationMetrics.java | 139 +++++++++++++------ .../federation/metrics/NamenodeBeanMetrics.java | 61 ++++---- .../federation/resolver/MountTableResolver.java | 16 ++- .../federation/router/ConnectionManager.java | 2 +- .../federation/router/ConnectionPool.java | 2 +- .../federation/router/RouterRpcServer.java | 115 +-------------- .../hdfs/server/federation/MockResolver.java | 5 +- .../server/federation/router/TestRouterRpc.java | 2 +- 9 files changed, 161 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs index fbfbaf2..0b96ec2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs @@ -47,6 +47,8 @@ function print_usage(){ echo " datanode run a DFS datanode" echo " debug run a Debug Admin to execute HDFS debug commands" echo " dfsadmin run a DFS admin client" + echo " dfsrouter run the DFS router" + echo " dfsrouteradmin manage Router-based federation" echo " haadmin run a DFS HA admin client" echo " fsck run a DFS filesystem checking utility" echo " balancer run a cluster balancing utility" @@ -157,6 +159,11 @@ elif [ "$COMMAND" = "dfs" ] ; then elif [ "$COMMAND" = "dfsadmin" ] ; then CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" +elif [ "$COMMAND" = "dfsrouter" ] ; then + CLASS='org.apache.hadoop.hdfs.server.federation.router.DFSRouter' + HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ROUTER_OPTS" +elif [ "$COMMAND" = "dfsrouteradmin" ] ; then + CLASS='org.apache.hadoop.hdfs.tools.federation.RouterAdmin' elif [ "$COMMAND" = "haadmin" ] ; then CLASS=org.apache.hadoop.hdfs.tools.DFSHAAdmin CLASSPATH=${CLASSPATH}:${TOOL_PATH} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java index 7844a2e..685c585 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.LinkedList; @@ -38,10 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.ToIntFunction; -import java.util.function.ToLongFunction; -import java.util.stream.Collectors; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -72,7 +69,7 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; import org.codehaus.jettison.json.JSONObject; -import org.eclipse.jetty.util.ajax.JSON; +import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -263,12 +260,12 @@ public class FederationMetrics implements FederationMBean { @Override public long getTotalCapacity() { - return getNameserviceAggregatedLong(MembershipStats::getTotalSpace); + return getNameserviceAggregatedLong("getTotalSpace"); } @Override public long getRemainingCapacity() { - return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace); + return getNameserviceAggregatedLong("getAvailableSpace"); } @Override @@ -323,31 +320,27 @@ public class FederationMetrics implements FederationMBean { @Override public int getNumLiveNodes() { - return getNameserviceAggregatedInt( - MembershipStats::getNumOfActiveDatanodes); + return getNameserviceAggregatedInt("getNumOfActiveDatanodes"); } @Override public int getNumDeadNodes() { - return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes); + return getNameserviceAggregatedInt("getNumOfDeadDatanodes"); } @Override public int getNumDecommissioningNodes() { - return getNameserviceAggregatedInt( - MembershipStats::getNumOfDecommissioningDatanodes); + return getNameserviceAggregatedInt("getNumOfDecommissioningDatanodes"); } @Override public int getNumDecomLiveNodes() { - return getNameserviceAggregatedInt( - MembershipStats::getNumOfDecomActiveDatanodes); + return getNameserviceAggregatedInt("getNumOfDecomActiveDatanodes"); } @Override public int getNumDecomDeadNodes() { - return getNameserviceAggregatedInt( - MembershipStats::getNumOfDecomDeadDatanodes); + return getNameserviceAggregatedInt("getNumOfDecomDeadDatanodes"); } @Override // NameNodeMXBean @@ -398,35 +391,32 @@ public class FederationMetrics implements FederationMBean { @Override public long getNumBlocks() { - return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks); + return getNameserviceAggregatedLong("getNumOfBlocks"); } @Override public long getNumOfMissingBlocks() { - return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing); + return getNameserviceAggregatedLong("getNumOfBlocksMissing"); } @Override public long getNumOfBlocksPendingReplication() { - return getNameserviceAggregatedLong( - MembershipStats::getNumOfBlocksPendingReplication); + return getNameserviceAggregatedLong("getNumOfBlocksPendingReplication"); } @Override public long getNumOfBlocksUnderReplicated() { - return getNameserviceAggregatedLong( - MembershipStats::getNumOfBlocksUnderReplicated); + return getNameserviceAggregatedLong("getNumOfBlocksUnderReplicated"); } @Override public long getNumOfBlocksPendingDeletion() { - return getNameserviceAggregatedLong( - MembershipStats::getNumOfBlocksPendingDeletion); + return getNameserviceAggregatedLong("getNumOfBlocksPendingDeletion"); } @Override public long getNumFiles() { - return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles); + return getNameserviceAggregatedLong("getNumOfFiles"); } @Override @@ -472,8 +462,7 @@ public class FederationMetrics implements FederationMBean { @Override public String getClusterId() { try { - Collection<String> clusterIds = - getNamespaceInfo(FederationNamespaceInfo::getClusterId); + Collection<String> clusterIds = getNamespaceInfo("getClusterId"); return clusterIds.toString(); } catch (IOException e) { LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage()); @@ -484,8 +473,7 @@ public class FederationMetrics implements FederationMBean { @Override public String getBlockPoolId() { try { - Collection<String> blockpoolIds = - getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId); + Collection<String> blockpoolIds = getNamespaceInfo("getBlockPoolId"); return blockpoolIds.toString(); } catch (IOException e) { LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage()); @@ -501,19 +489,31 @@ public class FederationMetrics implements FederationMBean { /** * Build a set of unique values found in all namespaces. * - * @param f Method reference of the appropriate FederationNamespaceInfo + * @param getterName String name of the appropriate FederationNamespaceInfo * getter function * @return Set of unique string values found in all discovered namespaces. * @throws IOException if the query could not be executed. */ - private Collection<String> getNamespaceInfo( - Function<FederationNamespaceInfo, String> f) throws IOException { + public Collection<String> getNamespaceInfo(String getterName) + throws IOException { + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); GetNamespaceInfoResponse response = membershipStore.getNamespaceInfo(request); - return response.getNamespaceInfo().stream() - .map(f) - .collect(Collectors.toSet()); + Set<FederationNamespaceInfo> namespacesInfo = response.getNamespaceInfo(); + + Set<String> ret = new HashSet<>(); + for (FederationNamespaceInfo namespace : namespacesInfo) { + try { + Method m = FederationNamespaceInfo.class.getDeclaredMethod(getterName); + String data = (String) m.invoke(namespace); + ret.add(data); + } catch (SecurityException | ReflectiveOperationException e) { + throw new IOException( + "Cannot invoke " + getterName + " from " + namespace); + } + } + return ret; } /** @@ -521,15 +521,19 @@ public class FederationMetrics implements FederationMBean { * @param f Method reference * @return Aggregated integer. */ - private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) { + private int getNameserviceAggregatedInt(String methodName) { + int total = 0; try { - return getActiveNamenodeRegistrations().stream() - .map(MembershipState::getStats) - .collect(Collectors.summingInt(f)); + Collection<Object> data = getNameservicesStats(methodName); + for (Object o : data) { + Integer l = (Integer) o; + total += l; + } } catch (IOException e) { - LOG.error("Unable to extract metrics: {}", e.getMessage()); + LOG.error("Cannot invoke {} for JMX: {}", methodName, e.getMessage()); return 0; } + return total; } /** @@ -537,15 +541,60 @@ public class FederationMetrics implements FederationMBean { * @param f Method reference * @return Aggregated long. */ - private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) { + private long getNameserviceAggregatedLong(String methodName) { + long total = 0; try { - return getActiveNamenodeRegistrations().stream() - .map(MembershipState::getStats) - .collect(Collectors.summingLong(f)); + Collection<Object> data = getNameservicesStats(methodName); + for (Object o : data) { + Long l = (Long) o; + total += l; + } } catch (IOException e) { - LOG.error("Unable to extract metrics: {}", e.getMessage()); + LOG.error("Cannot invoke {} for JMX: {}", methodName, e.getMessage()); return 0; } + return total; + } + + /** + * Aggregate a namenode data element from the most active namenode in each + * registered nameservice. + * + * @param getter String name of the getter function to invoke on the + * discovered NamenodeMembershipRecord object. + * @return Aggregated getter return values from all registered nameservices, + * one per nameservice. + * @throws IOException if the query could not be performed. + */ + private Collection<Object> getNameservicesStats(String getter) + throws IOException { + + List<Object> ret = new ArrayList<>(); + try { + Method metricsGetter = MembershipStats.class.getDeclaredMethod(getter); + List<MembershipState> namenodes = getActiveNamenodeRegistrations(); + for (MembershipState namenode : namenodes) { + try { + MembershipStats stats = namenode.getStats(); + if (stats != null) { + Object data = metricsGetter.invoke(stats); + ret.add(data); + } + } catch (ReflectiveOperationException e) { + throw new IOException( + "Cannot invoke " + getter + " from " + namenode); + } catch (IllegalArgumentException e) { + throw new IOException("Bad arguments invoking " + getter); + } + } + } catch (NoSuchMethodException e) { + throw new IOException( + "Cannot invoke " + getter + " from membership stats record"); + } catch (SecurityException e) { + throw new IOException( + "Cannot invoke " + getter + " from membership stats record"); + } + return ret; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java index 23cd675..93e9ea0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -21,12 +21,14 @@ import static org.apache.hadoop.util.Time.now; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.lang.reflect.Method; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.Set; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; @@ -52,7 +54,7 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.VersionInfo; -import org.eclipse.jetty.util.ajax.JSON; +import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,28 +234,16 @@ public class NamenodeBeanMetrics } @Override - @Deprecated public long getPendingReplicationBlocks() { return getFederationMetrics().getNumOfBlocksPendingReplication(); } @Override - public long getPendingReconstructionBlocks() { - return getFederationMetrics().getNumOfBlocksPendingReplication(); - } - - @Override - @Deprecated public long getUnderReplicatedBlocks() { return getFederationMetrics().getNumOfBlocksUnderReplicated(); } @Override - public long getLowRedundancyBlocks() { - return getFederationMetrics().getNumOfBlocksUnderReplicated(); - } - - @Override public long getPendingDeletionBlocks() { return getFederationMetrics().getNumOfBlocksPendingDeletion(); } @@ -338,7 +328,7 @@ public class NamenodeBeanMetrics @Override public String getClusterId() { try { - return getNamespaceInfo(FederationNamespaceInfo::getClusterId).toString(); + return getNamespaceInfo("getClusterId").toString(); } catch (IOException e) { LOG.error("Cannot fetch cluster ID metrics {}", e.getMessage()); return ""; @@ -348,8 +338,7 @@ public class NamenodeBeanMetrics @Override public String getBlockPoolId() { try { - return - getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId).toString(); + return getNamespaceInfo("getBlockPoolId").toString(); } catch (IOException e) { LOG.error("Cannot fetch block pool ID metrics {}", e.getMessage()); return ""; @@ -359,13 +348,14 @@ public class NamenodeBeanMetrics /** * Build a set of unique values found in all namespaces. * - * @param f Method reference of the appropriate FederationNamespaceInfo + * @param getterName String name of the appropriate FederationNamespaceInfo * getter function * @return Set of unique string values found in all discovered namespaces. * @throws IOException if the query could not be executed. */ - private Collection<String> getNamespaceInfo( - Function<FederationNamespaceInfo, String> f) throws IOException { + public Collection<String> getNamespaceInfo(String getterName) + throws IOException { + StateStoreService stateStore = router.getStateStore(); MembershipStore membershipStore = stateStore.getRegisteredRecordStore(MembershipStore.class); @@ -373,9 +363,20 @@ public class NamenodeBeanMetrics GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); GetNamespaceInfoResponse response = membershipStore.getNamespaceInfo(request); - return response.getNamespaceInfo().stream() - .map(f) - .collect(Collectors.toSet()); + Set<FederationNamespaceInfo> namespacesInfo = response.getNamespaceInfo(); + + Set<String> ret = new HashSet<String>(); + for (FederationNamespaceInfo namespace : namespacesInfo) { + try { + Method m = FederationNamespaceInfo.class.getDeclaredMethod(getterName); + String data = (String) m.invoke(namespace); + ret.add(data); + } catch (SecurityException | ReflectiveOperationException ex) { + throw new IOException( + "Cannot invoke " + getterName + " from " + namespace); + } + } + return ret; } @Override @@ -403,6 +404,12 @@ public class NamenodeBeanMetrics return this.router.getStartTime(); } + @Deprecated + @Override + public String getNNStarted() { + return new Date(this.router.getStartTime()).toString(); + } + @Override public String getCompileInfo() { return VersionInfo.getDate() + " by " + VersionInfo.getUser() + @@ -454,6 +461,12 @@ public class NamenodeBeanMetrics return getFederationMetrics().getNumFiles(); } + @Deprecated + @Override + public long getTotalFiles() { + return getFederationMetrics().getNumFiles(); + } + @Override public int getTotalLoad() { return -1; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java index 13e3db3..3d34f7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MountTableResolver.java @@ -323,20 +323,22 @@ public class MountTableResolver verifyMountTable(); readLock.lock(); try { - return this.locationCache.computeIfAbsent( - path, this::lookupLocation); + PathLocation ret = this.locationCache.get(path); + if (ret == null) { + ret = buildPathLocation(path); + this.locationCache.put(path, ret); + } + return ret; } finally { readLock.unlock(); } } /** - * Build the path location to insert into the cache atomically. It must hold - * the read lock. - * @param path Path to check/insert. - * @return New remote location. + * Builder to insert the path location into the cache atomically. It must + * hold the read lock. */ - public PathLocation lookupLocation(final String path) { + private PathLocation buildPathLocation(String path) { PathLocation ret = null; MountTable entry = findDeepest(path); if (entry != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 543d964..cc40834 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -36,7 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; -import org.eclipse.jetty.util.ajax.JSON; +import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index ca113ef..af46c05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -48,7 +48,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; -import org.eclipse.jetty.util.ajax.JSON; +import org.mortbay.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 4d3c237..13f70ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -64,7 +64,6 @@ import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.inotify.EventBatchList; -import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -75,12 +74,9 @@ import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; -import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats; import org.apache.hadoop.hdfs.protocol.EncryptionZone; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.ReencryptAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -88,11 +84,9 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.OpenFileEntry; -import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats; import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; -import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; @@ -457,18 +451,16 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, - CryptoProtocolVersion[] supportedVersions, String ecPolicyName) - throws IOException { + CryptoProtocolVersion[] supportedVersions) throws IOException { checkOperation(OperationCategory.WRITE); RemoteLocation createLocation = getCreateLocation(src); RemoteMethod method = new RemoteMethod("create", new Class<?>[] {String.class, FsPermission.class, String.class, EnumSetWritable.class, boolean.class, short.class, - long.class, CryptoProtocolVersion[].class, - String.class}, + long.class, CryptoProtocolVersion[].class}, createLocation.getDest(), masked, clientName, flag, createParent, - replication, blockSize, supportedVersions, ecPolicyName); + replication, blockSize, supportedVersions); return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method); } @@ -1216,27 +1208,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } @Override // ClientProtocol - public boolean saveNamespace(long timeWindow, long txGap) throws IOException { + public void saveNamespace() throws IOException { checkOperation(OperationCategory.UNCHECKED); - RemoteMethod method = new RemoteMethod("saveNamespace", - new Class<?>[] {Long.class, Long.class}, timeWindow, txGap); + RemoteMethod method = new RemoteMethod("saveNamespace", new Class<?>[] {}); final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); - Map<FederationNamespaceInfo, Object> ret = - rpcClient.invokeConcurrent(nss, method, true, false); - - boolean success = true; - Object obj = ret; - @SuppressWarnings("unchecked") - Map<FederationNamespaceInfo, Boolean> results = - (Map<FederationNamespaceInfo, Boolean>)obj; - Collection<Boolean> sucesses = results.values(); - for (boolean s : sucesses) { - if (!s) { - success = false; - } - } - return success; + rpcClient.invokeConcurrent(nss, method, true, false); } @Override // ClientProtocol @@ -1659,19 +1636,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } @Override // ClientProtocol - public void reencryptEncryptionZone(String zone, ReencryptAction action) - throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - - @Override // ClientProtocol - public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus( - long prevId) throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override // ClientProtocol public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag) throws IOException { checkOperation(OperationCategory.WRITE); @@ -1784,30 +1748,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { checkOperation(OperationCategory.WRITE, false); } - @Override - public ErasureCodingPolicy[] getErasureCodingPolicies() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override // ClientProtocol - public ErasureCodingPolicy getErasureCodingPolicy(String src) - throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override // ClientProtocol - public void setErasureCodingPolicy(String src, String ecPolicyName) - throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - - @Override // ClientProtocol - public void unsetErasureCodingPolicy(String src) throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - @Override // ClientProtocol public void setQuota(String path, long namespaceQuota, long storagespaceQuota, StorageType type) throws IOException { @@ -1870,46 +1810,6 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } @Override - public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( - ErasureCodingPolicy[] policies) throws IOException { - checkOperation(OperationCategory.WRITE, false); - return null; - } - - @Override - public void removeErasureCodingPolicy(String arg0) throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - - @Override - public void disableErasureCodingPolicy(String arg0) throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - - @Override - public void enableErasureCodingPolicy(String arg0) throws IOException { - checkOperation(OperationCategory.WRITE, false); - } - - @Override - public ECBlockGroupStats getECBlockGroupStats() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override - public Map<String, String> getErasureCodingCodecs() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override - public ReplicatedBlockStats getReplicatedBlockStats() throws IOException { - checkOperation(OperationCategory.READ, false); - return null; - } - - @Override public BatchedEntries<OpenFileEntry> listOpenFiles(long arg0) throws IOException { checkOperation(OperationCategory.READ, false); @@ -2017,9 +1917,8 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } long inodeId = 0; return new HdfsFileStatus(0, true, 0, 0, modTime, accessTime, permission, - EnumSet.noneOf(HdfsFileStatus.Flags.class), owner, group, new byte[0], DFSUtil.string2Bytes(name), inodeId, - childrenNum, null, (byte) 0, null); + childrenNum, null, (byte) 0); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index a481553..151d731 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -264,7 +264,10 @@ public class MockResolver @Override public PathLocation getDestinationForPath(String path) throws IOException { List<RemoteLocation> remoteLocations = new LinkedList<>(); - for (String key : this.locations.keySet()) { + // We go from the leaves to the root + List<String> keys = new ArrayList<>(this.locations.keySet()); + Collections.sort(keys, Collections.reverseOrder()); + for (String key : keys) { if (path.startsWith(key)) { for (RemoteLocation location : this.locations.get(key)) { String finalPath = location.getDest() + path.substring(key.length()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b60c658b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java index af506c9..d1d6601 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java @@ -800,7 +800,7 @@ public class TestRouterRpc { HdfsFileStatus status = routerProtocol.create( newRouterFile, new FsPermission("777"), clientName, new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1, - (long) 1024, CryptoProtocolVersion.supported(), null); + (long) 1024, CryptoProtocolVersion.supported()); // Add a block via router (requires client to have same lease) LocatedBlock block = routerProtocol.addBlock( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org