HDFS-12335. Federation Metrics. Contributed by Inigo Goiri. (cherry picked from commit 3b19e77752afce87936f5c0d1e6d272fba798d7b) (cherry picked from commit bc9e588a19c0aaf518de8dab719362be4a8d6a54)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/673f6856 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/673f6856 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/673f6856 Branch: refs/heads/branch-2 Commit: 673f6856897c452a63bae941e238de6ffb8c1b40 Parents: c778f9d Author: Inigo Goiri <inigo...@apache.org> Authored: Fri Sep 8 09:37:10 2017 -0700 Committer: vrushali <vrush...@apache.org> Committed: Fri Oct 20 11:22:32 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 11 + .../federation/metrics/FederationMBean.java | 204 ++++++ .../federation/metrics/FederationMetrics.java | 673 +++++++++++++++++++ .../federation/metrics/FederationRPCMBean.java | 90 +++ .../metrics/FederationRPCMetrics.java | 239 +++++++ .../FederationRPCPerformanceMonitor.java | 211 ++++++ .../federation/metrics/NamenodeBeanMetrics.java | 624 +++++++++++++++++ .../federation/metrics/StateStoreMBean.java | 45 ++ .../federation/metrics/StateStoreMetrics.java | 144 ++++ .../server/federation/metrics/package-info.java | 27 + .../federation/router/ConnectionManager.java | 23 + .../federation/router/ConnectionPool.java | 23 + .../hdfs/server/federation/router/Router.java | 62 ++ .../server/federation/router/RouterMetrics.java | 73 ++ .../federation/router/RouterMetricsService.java | 108 +++ .../federation/router/RouterRpcClient.java | 39 +- .../federation/router/RouterRpcMonitor.java | 95 +++ .../federation/router/RouterRpcServer.java | 63 +- .../federation/store/CachedRecordStore.java | 8 + .../federation/store/StateStoreService.java | 42 +- .../store/driver/StateStoreDriver.java | 17 +- .../driver/impl/StateStoreSerializableImpl.java | 6 +- .../driver/impl/StateStoreZooKeeperImpl.java | 26 + .../store/records/MembershipState.java | 2 +- .../federation/store/records/MountTable.java | 23 + .../records/impl/pb/MembershipStatePBImpl.java | 5 +- .../src/main/resources/hdfs-default.xml | 19 +- .../server/federation/FederationTestUtils.java | 13 + .../server/federation/RouterConfigBuilder.java | 13 + .../metrics/TestFederationMetrics.java | 237 +++++++ .../federation/metrics/TestMetricsBase.java | 150 +++++ .../server/federation/router/TestRouter.java | 23 +- .../store/driver/TestStateStoreDriverBase.java | 69 ++ 33 files changed, 3383 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b161bc0..3606e7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; @@ -1041,6 +1043,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { FEDERATION_ROUTER_PREFIX + "rpc.enable"; public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_METRICS_ENABLE = + FEDERATION_ROUTER_PREFIX + "metrics.enable"; + public static final boolean DFS_ROUTER_METRICS_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_METRICS_CLASS = + FEDERATION_ROUTER_PREFIX + "metrics.class"; + public static final Class<? extends RouterRpcMonitor> + DFS_ROUTER_METRICS_CLASS_DEFAULT = + FederationRPCPerformanceMonitor.class; + // HDFS Router heartbeat public static final String DFS_ROUTER_HEARTBEAT_ENABLE = FEDERATION_ROUTER_PREFIX + "heartbeat.enable"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java new file mode 100644 index 0000000..43efb3c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMBean.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the federation statistics. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FederationMBean { + + /** + * Get information about all the namenodes in the federation or null if + * failure. + * @return JSON with all the Namenodes. + */ + String getNamenodes(); + + /** + * Get the latest info for each registered nameservice. + * @return JSON with all the nameservices. + */ + String getNameservices(); + + /** + * Get the mount table for the federated filesystem or null if failure. + * @return JSON with the mount table. + */ + String getMountTable(); + + /** + * Get the total capacity of the federated cluster. + * @return Total capacity of the federated cluster. + */ + long getTotalCapacity(); + + /** + * Get the used capacity of the federated cluster. + * @return Used capacity of the federated cluster. + */ + long getUsedCapacity(); + + /** + * Get the remaining capacity of the federated cluster. + * @return Remaining capacity of the federated cluster. + */ + long getRemainingCapacity(); + + /** + * Get the number of nameservices in the federation. + * @return Number of nameservices in the federation. + */ + int getNumNameservices(); + + /** + * Get the number of namenodes. + * @return Number of namenodes. + */ + int getNumNamenodes(); + + /** + * Get the number of expired namenodes. + * @return Number of expired namenodes. + */ + int getNumExpiredNamenodes(); + + /** + * Get the number of live datanodes. + * @return Number of live datanodes. + */ + int getNumLiveNodes(); + + /** + * Get the number of dead datanodes. + * @return Number of dead datanodes. + */ + int getNumDeadNodes(); + + /** + * Get the number of decommissioning datanodes. + * @return Number of decommissioning datanodes. + */ + int getNumDecommissioningNodes(); + + /** + * Get the number of live decommissioned datanodes. + * @return Number of live decommissioned datanodes. + */ + int getNumDecomLiveNodes(); + + /** + * Get the number of dead decommissioned datanodes. + * @return Number of dead decommissioned datanodes. + */ + int getNumDecomDeadNodes(); + + /** + * Get Max, Median, Min and Standard Deviation of DataNodes usage. + * @return the DataNode usage information, as a JSON string. + */ + String getNodeUsage(); + + /** + * Get the number of blocks in the federation. + * @return Number of blocks in the federation. + */ + long getNumBlocks(); + + /** + * Get the number of missing blocks in the federation. + * @return Number of missing blocks in the federation. + */ + long getNumOfMissingBlocks(); + + /** + * Get the number of pending replication blocks in the federation. + * @return Number of pending replication blocks in the federation. + */ + long getNumOfBlocksPendingReplication(); + + /** + * Get the number of under replicated blocks in the federation. + * @return Number of under replicated blocks in the federation. + */ + long getNumOfBlocksUnderReplicated(); + + /** + * Get the number of pending deletion blocks in the federation. + * @return Number of pending deletion blocks in the federation. + */ + long getNumOfBlocksPendingDeletion(); + + /** + * Get the number of files in the federation. + * @return Number of files in the federation. + */ + long getNumFiles(); + + /** + * When the router started. + * @return Date as a string the router started. + */ + String getRouterStarted(); + + /** + * Get the version of the router. + * @return Version of the router. + */ + String getVersion(); + + /** + * Get the compilation date of the router. + * @return Compilation date of the router. + */ + String getCompiledDate(); + + /** + * Get the compilation info of the router. + * @return Compilation info of the router. + */ + String getCompileInfo(); + + /** + * Get the host and port of the router. + * @return Host and port of the router. + */ + String getHostAndPort(); + + /** + * Get the identifier of the router. + * @return Identifier of the router. + */ + String getRouterId(); + + /** + * Get the host and port of the router. + * @return Host and port of the router. + */ + String getClusterId(); + + /** + * Get the host and port of the router. + * @return Host and port of the router. + */ + String getBlockPoolId(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java new file mode 100644 index 0000000..1e80256 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java @@ -0,0 +1,673 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.util.Time.now; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.function.ToIntFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamenodeRegistrationsResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.VersionInfo; +import org.codehaus.jettison.json.JSONObject; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of the Router metrics collector. + */ +public class FederationMetrics implements FederationMBean { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationMetrics.class); + + /** Format for a date. */ + private static final String DATE_FORMAT = "yyyy/MM/dd HH:mm:ss"; + + + /** Router interface. */ + private final Router router; + + /** FederationState JMX bean. */ + private ObjectName beanName; + + /** Resolve the namenode for each namespace. */ + private final ActiveNamenodeResolver namenodeResolver; + + /** State store. */ + private final StateStoreService stateStore; + /** Membership state store. */ + private MembershipStore membershipStore; + /** Mount table store. */ + private MountTableStore mountTableStore; + + + public FederationMetrics(Router router) throws IOException { + this.router = router; + + try { + StandardMBean bean = new StandardMBean(this, FederationMBean.class); + this.beanName = MBeans.register("Router", "FederationState", bean); + LOG.info("Registered Router MBean: {}", this.beanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad Router MBean setup", e); + } + + // Resolve namenode for each nameservice + this.namenodeResolver = this.router.getNamenodeResolver(); + + // State store interfaces + this.stateStore = this.router.getStateStore(); + if (this.stateStore == null) { + LOG.error("State store not available"); + } else { + this.membershipStore = stateStore.getRegisteredRecordStore( + MembershipStore.class); + this.mountTableStore = stateStore.getRegisteredRecordStore( + MountTableStore.class); + } + } + + /** + * Unregister the JMX beans. + */ + public void close() { + if (this.beanName != null) { + MBeans.unregister(beanName); + } + } + + @Override + public String getNamenodes() { + final Map<String, Map<String, Object>> info = new LinkedHashMap<>(); + try { + // Get the values from the store + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + + // Order the namenodes + final List<MembershipState> namenodes = response.getNamenodeMemberships(); + if (namenodes == null || namenodes.size() == 0) { + return JSON.toString(info); + } + List<MembershipState> namenodesOrder = new ArrayList<>(namenodes); + Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR); + + // Dump namenodes information into JSON + for (MembershipState namenode : namenodesOrder) { + Map<String, Object> innerInfo = new HashMap<>(); + Map<String, Object> map = getJson(namenode); + innerInfo.putAll(map); + long dateModified = namenode.getDateModified(); + long lastHeartbeat = getSecondsSince(dateModified); + innerInfo.put("lastHeartbeat", lastHeartbeat); + MembershipStats stats = namenode.getStats(); + long used = stats.getTotalSpace() - stats.getAvailableSpace(); + innerInfo.put("used", used); + info.put(namenode.getNamenodeKey(), + Collections.unmodifiableMap(innerInfo)); + } + } catch (IOException e) { + LOG.error("Enable to fetch json representation of namenodes {}", + e.getMessage()); + return "{}"; + } + return JSON.toString(info); + } + + @Override + public String getNameservices() { + final Map<String, Map<String, Object>> info = new LinkedHashMap<>(); + try { + final List<MembershipState> namenodes = getActiveNamenodeRegistrations(); + List<MembershipState> namenodesOrder = new ArrayList<>(namenodes); + Collections.sort(namenodesOrder, MembershipState.NAME_COMPARATOR); + + // Dump namenodes information into JSON + for (MembershipState namenode : namenodesOrder) { + Map<String, Object> innerInfo = new HashMap<>(); + Map<String, Object> map = getJson(namenode); + innerInfo.putAll(map); + long dateModified = namenode.getDateModified(); + long lastHeartbeat = getSecondsSince(dateModified); + innerInfo.put("lastHeartbeat", lastHeartbeat); + MembershipStats stats = namenode.getStats(); + long used = stats.getTotalSpace() - stats.getAvailableSpace(); + innerInfo.put("used", used); + info.put(namenode.getNamenodeKey(), + Collections.unmodifiableMap(innerInfo)); + } + } catch (IOException e) { + LOG.error("Cannot retrieve nameservices for JMX: {}", e.getMessage()); + return "{}"; + } + return JSON.toString(info); + } + + @Override + public String getMountTable() { + final List<Map<String, Object>> info = new LinkedList<>(); + + try { + // Get all the mount points in order + GetMountTableEntriesRequest request = + GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = + mountTableStore.getMountTableEntries(request); + final List<MountTable> mounts = response.getEntries(); + List<MountTable> orderedMounts = new ArrayList<>(mounts); + Collections.sort(orderedMounts, MountTable.SOURCE_COMPARATOR); + + // Dump mount table entries information into JSON + for (MountTable entry : orderedMounts) { + // Sumarize destinations + Set<String> nameservices = new LinkedHashSet<>(); + Set<String> paths = new LinkedHashSet<>(); + for (RemoteLocation location : entry.getDestinations()) { + nameservices.add(location.getNameserviceId()); + paths.add(location.getDest()); + } + + Map<String, Object> map = getJson(entry); + // We add some values with a cleaner format + map.put("dateCreated", getDateString(entry.getDateCreated())); + map.put("dateModified", getDateString(entry.getDateModified())); + + Map<String, Object> innerInfo = new HashMap<>(); + innerInfo.putAll(map); + innerInfo.put("nameserviceId", StringUtils.join(",", nameservices)); + innerInfo.put("path", StringUtils.join(",", paths)); + if (nameservices.size() > 1) { + innerInfo.put("order", entry.getDestOrder().toString()); + } else { + innerInfo.put("order", ""); + } + innerInfo.put("readonly", entry.isReadOnly()); + info.add(Collections.unmodifiableMap(innerInfo)); + } + } catch (IOException e) { + LOG.error( + "Cannot generate JSON of mount table from store: {}", e.getMessage()); + return "[]"; + } + return JSON.toString(info); + } + + @Override + public long getTotalCapacity() { + return getNameserviceAggregatedLong(MembershipStats::getTotalSpace); + } + + @Override + public long getRemainingCapacity() { + return getNameserviceAggregatedLong(MembershipStats::getAvailableSpace); + } + + @Override + public long getUsedCapacity() { + return getTotalCapacity() - getRemainingCapacity(); + } + + @Override + public int getNumNameservices() { + try { + Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces(); + return nss.size(); + } catch (IOException e) { + LOG.error( + "Cannot fetch number of expired registrations from the store: {}", + e.getMessage()); + return 0; + } + } + + @Override + public int getNumNamenodes() { + try { + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getNamenodeRegistrations(request); + List<MembershipState> memberships = response.getNamenodeMemberships(); + return memberships.size(); + } catch (IOException e) { + LOG.error("Cannot retrieve numNamenodes for JMX: {}", e.getMessage()); + return 0; + } + } + + @Override + public int getNumExpiredNamenodes() { + try { + GetNamenodeRegistrationsRequest request = + GetNamenodeRegistrationsRequest.newInstance(); + GetNamenodeRegistrationsResponse response = + membershipStore.getExpiredNamenodeRegistrations(request); + List<MembershipState> expiredMemberships = + response.getNamenodeMemberships(); + return expiredMemberships.size(); + } catch (IOException e) { + LOG.error( + "Cannot retrieve numExpiredNamenodes for JMX: {}", e.getMessage()); + return 0; + } + } + + @Override + public int getNumLiveNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfActiveDatanodes); + } + + @Override + public int getNumDeadNodes() { + return getNameserviceAggregatedInt(MembershipStats::getNumOfDeadDatanodes); + } + + @Override + public int getNumDecommissioningNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfDecommissioningDatanodes); + } + + @Override + public int getNumDecomLiveNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfDecomActiveDatanodes); + } + + @Override + public int getNumDecomDeadNodes() { + return getNameserviceAggregatedInt( + MembershipStats::getNumOfDecomDeadDatanodes); + } + + @Override // NameNodeMXBean + public String getNodeUsage() { + float median = 0; + float max = 0; + float min = 0; + float dev = 0; + + final Map<String, Map<String, Object>> info = new HashMap<>(); + try { + RouterRpcServer rpcServer = this.router.getRpcServer(); + DatanodeInfo[] live = + rpcServer.getDatanodeReport(DatanodeReportType.LIVE); + + if (live.length > 0) { + float totalDfsUsed = 0; + float[] usages = new float[live.length]; + int i = 0; + for (DatanodeInfo dn : live) { + usages[i++] = dn.getDfsUsedPercent(); + totalDfsUsed += dn.getDfsUsedPercent(); + } + totalDfsUsed /= live.length; + Arrays.sort(usages); + median = usages[usages.length / 2]; + max = usages[usages.length - 1]; + min = usages[0]; + + for (i = 0; i < usages.length; i++) { + dev += (usages[i] - totalDfsUsed) * (usages[i] - totalDfsUsed); + } + dev = (float) Math.sqrt(dev / usages.length); + } + } catch (IOException e) { + LOG.info("Cannot get the live nodes: {}", e.getMessage()); + } + + final Map<String, Object> innerInfo = new HashMap<>(); + innerInfo.put("min", StringUtils.format("%.2f%%", min)); + innerInfo.put("median", StringUtils.format("%.2f%%", median)); + innerInfo.put("max", StringUtils.format("%.2f%%", max)); + innerInfo.put("stdDev", StringUtils.format("%.2f%%", dev)); + info.put("nodeUsage", innerInfo); + + return JSON.toString(info); + } + + @Override + public long getNumBlocks() { + return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocks); + } + + @Override + public long getNumOfMissingBlocks() { + return getNameserviceAggregatedLong(MembershipStats::getNumOfBlocksMissing); + } + + @Override + public long getNumOfBlocksPendingReplication() { + return getNameserviceAggregatedLong( + MembershipStats::getNumOfBlocksPendingReplication); + } + + @Override + public long getNumOfBlocksUnderReplicated() { + return getNameserviceAggregatedLong( + MembershipStats::getNumOfBlocksUnderReplicated); + } + + @Override + public long getNumOfBlocksPendingDeletion() { + return getNameserviceAggregatedLong( + MembershipStats::getNumOfBlocksPendingDeletion); + } + + @Override + public long getNumFiles() { + return getNameserviceAggregatedLong(MembershipStats::getNumOfFiles); + } + + @Override + public String getRouterStarted() { + long startTime = this.router.getStartTime(); + return new Date(startTime).toString(); + } + + @Override + public String getVersion() { + return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision(); + } + + @Override + public String getCompiledDate() { + return VersionInfo.getDate(); + } + + @Override + public String getCompileInfo() { + return VersionInfo.getDate() + " by " + VersionInfo.getUser() + " from " + + VersionInfo.getBranch(); + } + + @Override + public String getHostAndPort() { + // TODO this should be the HTTP address + return "Unknown"; + } + + @Override + public String getRouterId() { + return this.router.getRouterId(); + } + + @Override + public String getClusterId() { + try { + Collection<String> clusterIds = + getNamespaceInfo(FederationNamespaceInfo::getClusterId); + return clusterIds.toString(); + } catch (IOException e) { + LOG.error("Cannot fetch cluster ID metrics: {}", e.getMessage()); + return ""; + } + } + + @Override + public String getBlockPoolId() { + try { + Collection<String> blockpoolIds = + getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId); + return blockpoolIds.toString(); + } catch (IOException e) { + LOG.error("Cannot fetch block pool ID metrics: {}", e.getMessage()); + return ""; + } + } + + /** + * Build a set of unique values found in all namespaces. + * + * @param f Method reference of the appropriate FederationNamespaceInfo + * getter function + * @return Set of unique string values found in all discovered namespaces. + * @throws IOException if the query could not be executed. + */ + private Collection<String> getNamespaceInfo( + Function<FederationNamespaceInfo, String> f) throws IOException { + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + membershipStore.getNamespaceInfo(request); + return response.getNamespaceInfo().stream() + .map(f) + .collect(Collectors.toSet()); + } + + /** + * Get the aggregated value for a method for all nameservices. + * @param f Method reference + * @return Aggregated integer. + */ + private int getNameserviceAggregatedInt(ToIntFunction<MembershipStats> f) { + try { + return getActiveNamenodeRegistrations().stream() + .map(MembershipState::getStats) + .collect(Collectors.summingInt(f)); + } catch (IOException e) { + LOG.error("Unable to extract metrics: {}", e.getMessage()); + return 0; + } + } + + /** + * Get the aggregated value for a method for all nameservices. + * @param f Method reference + * @return Aggregated long. + */ + private long getNameserviceAggregatedLong(ToLongFunction<MembershipStats> f) { + try { + return getActiveNamenodeRegistrations().stream() + .map(MembershipState::getStats) + .collect(Collectors.summingLong(f)); + } catch (IOException e) { + LOG.error("Unable to extract metrics: {}", e.getMessage()); + return 0; + } + } + + /** + * Fetches the most active namenode memberships for all known nameservices. + * The fetched membership may not or may not be active. Excludes expired + * memberships. + * @throws IOException if the query could not be performed. + * @return List of the most active NNs from each known nameservice. + */ + private List<MembershipState> getActiveNamenodeRegistrations() + throws IOException { + + List<MembershipState> resultList = new ArrayList<>(); + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + membershipStore.getNamespaceInfo(request); + for (FederationNamespaceInfo nsInfo : response.getNamespaceInfo()) { + // Fetch the most recent namenode registration + String nsId = nsInfo.getNameserviceId(); + List<? extends FederationNamenodeContext> nns = + namenodeResolver.getNamenodesForNameserviceId(nsId); + if (nns != null) { + FederationNamenodeContext nn = nns.get(0); + if (nn != null && nn instanceof MembershipState) { + resultList.add((MembershipState) nn); + } + } + } + return resultList; + } + + /** + * Get time as a date string. + * @param time Seconds since 1970. + * @return String representing the date. + */ + private static String getDateString(long time) { + if (time <= 0) { + return "-"; + } + Date date = new Date(time); + + SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT); + return sdf.format(date); + } + + /** + * Get the number of seconds passed since a date. + * + * @param timeMs to use as a reference. + * @return Seconds since the date. + */ + private static long getSecondsSince(long timeMs) { + if (timeMs < 0) { + return -1; + } + return (now() - timeMs) / 1000; + } + + /** + * Get JSON for this record. + * + * @return Map representing the data for the JSON representation. + */ + private static Map<String, Object> getJson(BaseRecord record) { + Map<String, Object> json = new HashMap<>(); + Map<String, Class<?>> fields = getFields(record); + + for (String fieldName : fields.keySet()) { + if (!fieldName.equalsIgnoreCase("proto")) { + try { + Object value = getField(record, fieldName); + if (value instanceof BaseRecord) { + BaseRecord recordField = (BaseRecord) value; + json.putAll(getJson(recordField)); + } else { + json.put(fieldName, value == null ? JSONObject.NULL : value); + } + } catch (Exception e) { + throw new IllegalArgumentException( + "Cannot serialize field " + fieldName + " into JSON"); + } + } + } + return json; + } + + /** + * Returns all serializable fields in the object. + * + * @return Map with the fields. + */ + private static Map<String, Class<?>> getFields(BaseRecord record) { + Map<String, Class<?>> getters = new HashMap<>(); + for (Method m : record.getClass().getDeclaredMethods()) { + if (m.getName().startsWith("get")) { + try { + Class<?> type = m.getReturnType(); + char[] c = m.getName().substring(3).toCharArray(); + c[0] = Character.toLowerCase(c[0]); + String key = new String(c); + getters.put(key, type); + } catch (Exception e) { + LOG.error("Cannot execute getter {} on {}", m.getName(), record); + } + } + } + return getters; + } + + /** + * Fetches the value for a field name. + * + * @param fieldName the legacy name of the field. + * @return The field data or null if not found. + */ + private static Object getField(BaseRecord record, String fieldName) { + Object result = null; + Method m = locateGetter(record, fieldName); + if (m != null) { + try { + result = m.invoke(record); + } catch (Exception e) { + LOG.error("Cannot get field {} on {}", fieldName, record); + } + } + return result; + } + + /** + * Finds the appropriate getter for a field name. + * + * @param fieldName The legacy name of the field. + * @return The matching getter or null if not found. + */ + private static Method locateGetter(BaseRecord record, String fieldName) { + for (Method m : record.getClass().getMethods()) { + if (m.getName().equalsIgnoreCase("get" + fieldName)) { + return m; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java new file mode 100644 index 0000000..00209e9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the RPC server. + * TODO use the default RPC MBean. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface FederationRPCMBean { + + long getProxyOps(); + + double getProxyAvg(); + + long getProcessingOps(); + + double getProcessingAvg(); + + long getProxyOpFailureCommunicate(); + + long getProxyOpFailureStandby(); + + long getProxyOpNotImplemented(); + + long getRouterFailureStateStoreOps(); + + long getRouterFailureReadOnlyOps(); + + long getRouterFailureLockedOps(); + + long getRouterFailureSafemodeOps(); + + int getRpcServerCallQueue(); + + /** + * Get the number of RPC connections between the clients and the Router. + * @return Number of RPC connections between the clients and the Router. + */ + int getRpcServerNumOpenConnections(); + + /** + * Get the number of RPC connections between the Router and the NNs. + * @return Number of RPC connections between the Router and the NNs. + */ + int getRpcClientNumConnections(); + + /** + * Get the number of active RPC connections between the Router and the NNs. + * @return Number of active RPC connections between the Router and the NNs. + */ + int getRpcClientNumActiveConnections(); + + /** + * Get the number of RPC connections to be created. + * @return Number of RPC connections to be created. + */ + int getRpcClientNumCreatingConnections(); + + /** + * Get the number of connection pools between the Router and a NNs. + * @return Number of connection pools between the Router and a NNs. + */ + int getRpcClientNumConnectionPools(); + + /** + * JSON representation of the RPC connections from the Router to the NNs. + * @return JSON string representation. + */ + String getRpcClientConnections(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java new file mode 100644 index 0000000..427bca2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * Implementation of the RPC metrics collector. + */ +@Metrics(name = "RouterRPCActivity", about = "Router RPC Activity", + context = "router") +public class FederationRPCMetrics implements FederationRPCMBean { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + private RouterRpcServer rpcServer; + + @Metric("Time for the router to process an operation internally") + private MutableRate processing; + @Metric("Number of operations the Router processed internally") + private MutableCounterLong processingOp; + @Metric("Time for the Router to proxy an operation to the Namenodes") + private MutableRate proxy; + @Metric("Number of operations the Router proxied to a Namenode") + private MutableCounterLong proxyOp; + + @Metric("Number of operations to fail to reach NN") + private MutableCounterLong proxyOpFailureStandby; + @Metric("Number of operations to hit a standby NN") + private MutableCounterLong proxyOpFailureCommunicate; + @Metric("Number of operations not implemented") + private MutableCounterLong proxyOpNotImplemented; + + @Metric("Failed requests due to State Store unavailable") + private MutableCounterLong routerFailureStateStore; + @Metric("Failed requests due to read only mount point") + private MutableCounterLong routerFailureReadOnly; + @Metric("Failed requests due to locked path") + private MutableCounterLong routerFailureLocked; + @Metric("Failed requests due to safe mode") + private MutableCounterLong routerFailureSafemode; + + public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) { + this.rpcServer = rpcServer; + + registry.tag(SessionId, "RouterRPCSession"); + registry.tag(ProcessName, "Router"); + } + + public static FederationRPCMetrics create(Configuration conf, + RouterRpcServer rpcServer) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(FederationRPCMetrics.class.getName(), + "HDFS Federation RPC Metrics", + new FederationRPCMetrics(conf, rpcServer)); + } + + /** + * Convert nanoseconds to milliseconds. + * @param ns Time in nanoseconds. + * @return Time in milliseconds. + */ + private static double toMs(double ns) { + return ns / 1000000; + } + + /** + * Reset the metrics system. + */ + public static void reset() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + ms.unregisterSource(FederationRPCMetrics.class.getName()); + } + + public void incrProxyOpFailureStandby() { + proxyOpFailureStandby.incr(); + } + + @Override + public long getProxyOpFailureStandby() { + return proxyOpFailureStandby.value(); + } + + public void incrProxyOpFailureCommunicate() { + proxyOpFailureCommunicate.incr(); + } + + @Override + public long getProxyOpFailureCommunicate() { + return proxyOpFailureCommunicate.value(); + } + + + public void incrProxyOpNotImplemented() { + proxyOpNotImplemented.incr(); + } + + @Override + public long getProxyOpNotImplemented() { + return proxyOpNotImplemented.value(); + } + + public void incrRouterFailureStateStore() { + routerFailureStateStore.incr(); + } + + @Override + public long getRouterFailureStateStoreOps() { + return routerFailureStateStore.value(); + } + + public void incrRouterFailureSafemode() { + routerFailureSafemode.incr(); + } + + @Override + public long getRouterFailureSafemodeOps() { + return routerFailureSafemode.value(); + } + + public void incrRouterFailureReadOnly() { + routerFailureReadOnly.incr(); + } + + @Override + public long getRouterFailureReadOnlyOps() { + return routerFailureReadOnly.value(); + } + + public void incrRouterFailureLocked() { + routerFailureLocked.incr(); + } + + @Override + public long getRouterFailureLockedOps() { + return routerFailureLocked.value(); + } + + @Override + public int getRpcServerCallQueue() { + return rpcServer.getServer().getCallQueueLen(); + } + + @Override + public int getRpcServerNumOpenConnections() { + return rpcServer.getServer().getNumOpenConnections(); + } + + @Override + public int getRpcClientNumConnections() { + return rpcServer.getRPCClient().getNumConnections(); + } + + @Override + public int getRpcClientNumActiveConnections() { + return rpcServer.getRPCClient().getNumActiveConnections(); + } + + @Override + public int getRpcClientNumCreatingConnections() { + return rpcServer.getRPCClient().getNumCreatingConnections(); + } + + @Override + public int getRpcClientNumConnectionPools() { + return rpcServer.getRPCClient().getNumConnectionPools(); + } + + @Override + public String getRpcClientConnections() { + return rpcServer.getRPCClient().getJSON(); + } + + /** + * Add the time to proxy an operation from the moment the Router sends it to + * the Namenode until it replied. + * @param time Proxy time of an operation in nanoseconds. + */ + public void addProxyTime(long time) { + proxy.add(time); + proxyOp.incr(); + } + + @Override + public double getProxyAvg() { + return toMs(proxy.lastStat().mean()); + } + + @Override + public long getProxyOps() { + return proxyOp.value(); + } + + /** + * Add the time to process a request in the Router from the time we receive + * the call until we send it to the Namenode. + * @param time Process time of an operation in nanoseconds. + */ + public void addProcessingTime(long time) { + processing.add(time); + processingOp.incr(); + } + + @Override + public double getProcessingAvg() { + return toMs(processing.lastStat().mean()); + } + + @Override + public long getProcessingOps() { + return processingOp.value(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java new file mode 100644 index 0000000..e3a16b5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.metrics2.util.MBeans; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Customizable RPC performance monitor. Receives events from the RPC server + * and aggregates them via JMX. + */ +public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { + + private static final Logger LOG = + LoggerFactory.getLogger(FederationRPCPerformanceMonitor.class); + + + /** Time for an operation to be received in the Router. */ + private static final ThreadLocal<Long> START_TIME = new ThreadLocal<>(); + /** Time for an operation to be send to the Namenode. */ + private static final ThreadLocal<Long> PROXY_TIME = new ThreadLocal<>(); + + /** Configuration for the performance monitor. */ + private Configuration conf; + /** RPC server for the Router. */ + private RouterRpcServer server; + /** State Store. */ + private StateStoreService store; + + /** JMX interface to monitor the RPC metrics. */ + private FederationRPCMetrics metrics; + private ObjectName registeredBean; + + /** Thread pool for logging stats. */ + private ExecutorService executor; + + + @Override + public void init(Configuration configuration, RouterRpcServer rpcServer, + StateStoreService stateStore) { + + this.conf = configuration; + this.server = rpcServer; + this.store = stateStore; + + // Create metrics + this.metrics = FederationRPCMetrics.create(conf, server); + + // Create thread pool + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("Federation RPC Performance Monitor-%d").build(); + this.executor = Executors.newFixedThreadPool(1, threadFactory); + + // Adding JMX interface + try { + StandardMBean bean = + new StandardMBean(this.metrics, FederationRPCMBean.class); + registeredBean = MBeans.register("Router", "FederationRPC", bean); + LOG.info("Registered FederationRPCMBean: {}", registeredBean); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad FederationRPCMBean setup", e); + } + } + + @Override + public void close() { + if (registeredBean != null) { + MBeans.unregister(registeredBean); + registeredBean = null; + } + if (this.executor != null) { + this.executor.shutdown(); + } + } + + /** + * Resets all RPC service performance counters to their defaults. + */ + public void resetPerfCounters() { + if (registeredBean != null) { + MBeans.unregister(registeredBean); + registeredBean = null; + } + if (metrics != null) { + FederationRPCMetrics.reset(); + metrics = null; + } + init(conf, server, store); + } + + @Override + public void startOp() { + START_TIME.set(this.getNow()); + } + + @Override + public long proxyOp() { + PROXY_TIME.set(this.getNow()); + long processingTime = getProcessingTime(); + if (processingTime >= 0) { + metrics.addProcessingTime(processingTime); + } + return Thread.currentThread().getId(); + } + + @Override + public void proxyOpComplete(boolean success) { + if (success) { + long proxyTime = getProxyTime(); + if (proxyTime >= 0) { + metrics.addProxyTime(proxyTime); + } + } + } + + @Override + public void proxyOpFailureStandby() { + metrics.incrProxyOpFailureStandby(); + } + + @Override + public void proxyOpFailureCommunicate() { + metrics.incrProxyOpFailureCommunicate(); + } + + @Override + public void proxyOpNotImplemented() { + metrics.incrProxyOpNotImplemented(); + } + + @Override + public void routerFailureStateStore() { + metrics.incrRouterFailureStateStore(); + } + + @Override + public void routerFailureSafemode() { + metrics.incrRouterFailureSafemode(); + } + + @Override + public void routerFailureReadOnly() { + metrics.incrRouterFailureReadOnly(); + } + + @Override + public void routerFailureLocked() { + metrics.incrRouterFailureLocked(); + } + + /** + * Get current time. + * @return Current time in nanoseconds. + */ + private long getNow() { + return System.nanoTime(); + } + + /** + * Get time between we receiving the operation and sending it to the Namenode. + * @return Processing time in nanoseconds. + */ + private long getProcessingTime() { + if (START_TIME.get() != null && START_TIME.get() > 0 && + PROXY_TIME.get() != null && PROXY_TIME.get() > 0) { + return PROXY_TIME.get() - START_TIME.get(); + } + return -1; + } + + /** + * Get time between now and when the operation was forwarded to the Namenode. + * @return Current proxy time in nanoseconds. + */ + private long getProxyTime() { + if (PROXY_TIME.get() != null && PROXY_TIME.get() > 0) { + return getNow() - PROXY_TIME.get(); + } + return -1; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java new file mode 100644 index 0000000..23cd675 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NamenodeBeanMetrics.java @@ -0,0 +1,624 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.util.Time.now; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetNamespaceInfoResponse; +import org.apache.hadoop.hdfs.server.namenode.NameNodeMXBean; +import org.apache.hadoop.hdfs.server.namenode.NameNodeStatusMXBean; +import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.VersionInfo; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Expose the Namenode metrics as the Router was one. + */ +public class NamenodeBeanMetrics + implements FSNamesystemMBean, NameNodeMXBean, NameNodeStatusMXBean { + + private static final Logger LOG = + LoggerFactory.getLogger(NamenodeBeanMetrics.class); + + private final Router router; + + /** FSNamesystem bean. */ + private ObjectName fsBeanName; + /** FSNamesystemState bean. */ + private ObjectName fsStateBeanName; + /** NameNodeInfo bean. */ + private ObjectName nnInfoBeanName; + /** NameNodeStatus bean. */ + private ObjectName nnStatusBeanName; + + + public NamenodeBeanMetrics(Router router) { + this.router = router; + + try { + // TODO this needs to be done with the Metrics from FSNamesystem + StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class); + this.fsBeanName = MBeans.register("NameNode", "FSNamesystem", bean); + LOG.info("Registered FSNamesystem MBean: {}", this.fsBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad FSNamesystem MBean setup", e); + } + + try { + StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class); + this.fsStateBeanName = + MBeans.register("NameNode", "FSNamesystemState", bean); + LOG.info("Registered FSNamesystemState MBean: {}", this.fsStateBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad FSNamesystemState MBean setup", e); + } + + try { + StandardMBean bean = new StandardMBean(this, NameNodeMXBean.class); + this.nnInfoBeanName = MBeans.register("NameNode", "NameNodeInfo", bean); + LOG.info("Registered NameNodeInfo MBean: {}", this.nnInfoBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad NameNodeInfo MBean setup", e); + } + + try { + StandardMBean bean = new StandardMBean(this, NameNodeStatusMXBean.class); + this.nnStatusBeanName = + MBeans.register("NameNode", "NameNodeStatus", bean); + LOG.info("Registered NameNodeStatus MBean: {}", this.nnStatusBeanName); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad NameNodeStatus MBean setup", e); + } + } + + /** + * De-register the JMX interfaces. + */ + public void close() { + if (fsStateBeanName != null) { + MBeans.unregister(fsStateBeanName); + fsStateBeanName = null; + } + if (nnInfoBeanName != null) { + MBeans.unregister(nnInfoBeanName); + nnInfoBeanName = null; + } + // Remove the NameNode status bean + if (nnStatusBeanName != null) { + MBeans.unregister(nnStatusBeanName); + nnStatusBeanName = null; + } + } + + private FederationMetrics getFederationMetrics() { + return this.router.getMetrics(); + } + + ///////////////////////////////////////////////////////// + // NameNodeMXBean + ///////////////////////////////////////////////////////// + + @Override + public String getVersion() { + return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision(); + } + + @Override + public String getSoftwareVersion() { + return VersionInfo.getVersion(); + } + + @Override + public long getUsed() { + return getFederationMetrics().getUsedCapacity(); + } + + @Override + public long getFree() { + return getFederationMetrics().getRemainingCapacity(); + } + + @Override + public long getTotal() { + return getFederationMetrics().getTotalCapacity(); + } + + @Override + public String getSafemode() { + // We assume that the global federated view is never in safe mode + return ""; + } + + @Override + public boolean isUpgradeFinalized() { + // We assume the upgrade is always finalized in a federated biew + return true; + } + + @Override + public RollingUpgradeInfo.Bean getRollingUpgradeStatus() { + return null; + } + + @Override + public long getNonDfsUsedSpace() { + return 0; + } + + @Override + public float getPercentUsed() { + return DFSUtilClient.getPercentUsed(getCapacityUsed(), getCapacityTotal()); + } + + @Override + public float getPercentRemaining() { + return DFSUtilClient.getPercentUsed( + getCapacityRemaining(), getCapacityTotal()); + } + + @Override + public long getCacheUsed() { + return 0; + } + + @Override + public long getCacheCapacity() { + return 0; + } + + @Override + public long getBlockPoolUsedSpace() { + return 0; + } + + @Override + public float getPercentBlockPoolUsed() { + return 0; + } + + @Override + public long getTotalBlocks() { + return getFederationMetrics().getNumBlocks(); + } + + @Override + public long getNumberOfMissingBlocks() { + return getFederationMetrics().getNumOfMissingBlocks(); + } + + @Override + @Deprecated + public long getPendingReplicationBlocks() { + return getFederationMetrics().getNumOfBlocksPendingReplication(); + } + + @Override + public long getPendingReconstructionBlocks() { + return getFederationMetrics().getNumOfBlocksPendingReplication(); + } + + @Override + @Deprecated + public long getUnderReplicatedBlocks() { + return getFederationMetrics().getNumOfBlocksUnderReplicated(); + } + + @Override + public long getLowRedundancyBlocks() { + return getFederationMetrics().getNumOfBlocksUnderReplicated(); + } + + @Override + public long getPendingDeletionBlocks() { + return getFederationMetrics().getNumOfBlocksPendingDeletion(); + } + + @Override + public long getScheduledReplicationBlocks() { + return -1; + } + + @Override + public long getNumberOfMissingBlocksWithReplicationFactorOne() { + return 0; + } + + @Override + public String getCorruptFiles() { + return "N/A"; + } + + @Override + public int getThreads() { + return ManagementFactory.getThreadMXBean().getThreadCount(); + } + + @Override + public String getLiveNodes() { + return this.getNodes(DatanodeReportType.LIVE); + } + + @Override + public String getDeadNodes() { + return this.getNodes(DatanodeReportType.DEAD); + } + + @Override + public String getDecomNodes() { + return this.getNodes(DatanodeReportType.DECOMMISSIONING); + } + + /** + * Get all the nodes in the federation from a particular type. + * TODO this is expensive, we may want to cache it. + * @param type Type of the datanodes to check. + * @return JSON with the nodes. + */ + private String getNodes(DatanodeReportType type) { + final Map<String, Map<String, Object>> info = new HashMap<>(); + try { + RouterRpcServer rpcServer = this.router.getRpcServer(); + DatanodeInfo[] datanodes = rpcServer.getDatanodeReport(type); + for (DatanodeInfo node : datanodes) { + Map<String, Object> innerinfo = new HashMap<>(); + innerinfo.put("infoAddr", node.getInfoAddr()); + innerinfo.put("infoSecureAddr", node.getInfoSecureAddr()); + innerinfo.put("xferaddr", node.getXferAddr()); + innerinfo.put("location", node.getNetworkLocation()); + innerinfo.put("lastContact", getLastContact(node)); + innerinfo.put("usedSpace", node.getDfsUsed()); + innerinfo.put("adminState", node.getAdminState().toString()); + innerinfo.put("nonDfsUsedSpace", node.getNonDfsUsed()); + innerinfo.put("capacity", node.getCapacity()); + innerinfo.put("numBlocks", -1); // node.numBlocks() + innerinfo.put("version", (node.getSoftwareVersion() == null ? + "UNKNOWN" : node.getSoftwareVersion())); + innerinfo.put("used", node.getDfsUsed()); + innerinfo.put("remaining", node.getRemaining()); + innerinfo.put("blockScheduled", -1); // node.getBlocksScheduled() + innerinfo.put("blockPoolUsed", node.getBlockPoolUsed()); + innerinfo.put("blockPoolUsedPercent", node.getBlockPoolUsedPercent()); + innerinfo.put("volfails", -1); // node.getVolumeFailures() + info.put(node.getHostName() + ":" + node.getXferPort(), + Collections.unmodifiableMap(innerinfo)); + } + } catch (StandbyException e) { + LOG.error("Cannot get {} nodes, Router in safe mode", type); + } catch (IOException e) { + LOG.error("Cannot get " + type + " nodes", e); + } + return JSON.toString(info); + } + + @Override + public String getClusterId() { + try { + return getNamespaceInfo(FederationNamespaceInfo::getClusterId).toString(); + } catch (IOException e) { + LOG.error("Cannot fetch cluster ID metrics {}", e.getMessage()); + return ""; + } + } + + @Override + public String getBlockPoolId() { + try { + return + getNamespaceInfo(FederationNamespaceInfo::getBlockPoolId).toString(); + } catch (IOException e) { + LOG.error("Cannot fetch block pool ID metrics {}", e.getMessage()); + return ""; + } + } + + /** + * Build a set of unique values found in all namespaces. + * + * @param f Method reference of the appropriate FederationNamespaceInfo + * getter function + * @return Set of unique string values found in all discovered namespaces. + * @throws IOException if the query could not be executed. + */ + private Collection<String> getNamespaceInfo( + Function<FederationNamespaceInfo, String> f) throws IOException { + StateStoreService stateStore = router.getStateStore(); + MembershipStore membershipStore = + stateStore.getRegisteredRecordStore(MembershipStore.class); + + GetNamespaceInfoRequest request = GetNamespaceInfoRequest.newInstance(); + GetNamespaceInfoResponse response = + membershipStore.getNamespaceInfo(request); + return response.getNamespaceInfo().stream() + .map(f) + .collect(Collectors.toSet()); + } + + @Override + public String getNameDirStatuses() { + return "N/A"; + } + + @Override + public String getNodeUsage() { + return "N/A"; + } + + @Override + public String getNameJournalStatus() { + return "N/A"; + } + + @Override + public String getJournalTransactionInfo() { + return "N/A"; + } + + @Override + public long getNNStartedTimeInMillis() { + return this.router.getStartTime(); + } + + @Override + public String getCompileInfo() { + return VersionInfo.getDate() + " by " + VersionInfo.getUser() + + " from " + VersionInfo.getBranch(); + } + + @Override + public int getDistinctVersionCount() { + return 0; + } + + @Override + public Map<String, Integer> getDistinctVersions() { + return null; + } + + ///////////////////////////////////////////////////////// + // FSNamesystemMBean + ///////////////////////////////////////////////////////// + + @Override + public String getFSState() { + // We assume is not in safe mode + return "Operational"; + } + + @Override + public long getBlocksTotal() { + return this.getTotalBlocks(); + } + + @Override + public long getCapacityTotal() { + return this.getTotal(); + } + + @Override + public long getCapacityRemaining() { + return this.getFree(); + } + + @Override + public long getCapacityUsed() { + return this.getUsed(); + } + + @Override + public long getFilesTotal() { + return getFederationMetrics().getNumFiles(); + } + + @Override + public int getTotalLoad() { + return -1; + } + + @Override + public int getNumLiveDataNodes() { + return this.router.getMetrics().getNumLiveNodes(); + } + + @Override + public int getNumDeadDataNodes() { + return this.router.getMetrics().getNumDeadNodes(); + } + + @Override + public int getNumStaleDataNodes() { + return -1; + } + + @Override + public int getNumDecomLiveDataNodes() { + return this.router.getMetrics().getNumDecomLiveNodes(); + } + + @Override + public int getNumDecomDeadDataNodes() { + return this.router.getMetrics().getNumDecomDeadNodes(); + } + + @Override + public int getNumDecommissioningDataNodes() { + return this.router.getMetrics().getNumDecommissioningNodes(); + } + + @Override + public int getNumInMaintenanceLiveDataNodes() { + return 0; + } + + @Override + public int getNumInMaintenanceDeadDataNodes() { + return 0; + } + + @Override + public int getNumEnteringMaintenanceDataNodes() { + return 0; + } + + @Override + public int getVolumeFailuresTotal() { + return 0; + } + + @Override + public long getEstimatedCapacityLostTotal() { + return 0; + } + + @Override + public String getSnapshotStats() { + return null; + } + + @Override + public long getMaxObjects() { + return 0; + } + + @Override + public long getBlockDeletionStartTime() { + return -1; + } + + @Override + public int getNumStaleStorages() { + return -1; + } + + @Override + public String getTopUserOpCounts() { + return "N/A"; + } + + @Override + public int getFsLockQueueLength() { + return 0; + } + + @Override + public long getTotalSyncCount() { + return 0; + } + + @Override + public String getTotalSyncTimes() { + return ""; + } + + private long getLastContact(DatanodeInfo node) { + return (now() - node.getLastUpdate()) / 1000; + } + + ///////////////////////////////////////////////////////// + // NameNodeStatusMXBean + ///////////////////////////////////////////////////////// + + @Override + public String getNNRole() { + return NamenodeRole.NAMENODE.toString(); + } + + @Override + public String getState() { + return HAServiceState.ACTIVE.toString(); + } + + @Override + public String getHostAndPort() { + return NetUtils.getHostPortString(router.getRpcServerAddress()); + } + + @Override + public boolean isSecurityEnabled() { + return false; + } + + @Override + public long getLastHATransitionTime() { + return 0; + } + + @Override + public long getBytesWithFutureGenerationStamps() { + return 0; + } + + @Override + public String getSlowPeersReport() { + return "N/A"; + } + + @Override + public String getSlowDisksReport() { + return "N/A"; + } + + @Override + public long getNumberOfSnapshottableDirs() { + return 0; + } + + @Override + public String getEnteringMaintenanceNodes() { + return "N/A"; + } + + @Override + public String getNameDirSize() { + return "N/A"; + } + + @Override + public int getNumEncryptionZones() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java new file mode 100644 index 0000000..5e4ccab --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMBean.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * JMX interface for the State Store metrics. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface StateStoreMBean { + + long getReadOps(); + + double getReadAvg(); + + long getWriteOps(); + + double getWriteAvg(); + + long getFailureOps(); + + double getFailureAvg(); + + long getRemoveOps(); + + double getRemoveAvg(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java new file mode 100644 index 0000000..c17eabc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/StateStoreMetrics.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Implementations of the JMX interface for the State Store metrics. + */ +@Metrics(name = "StateStoreActivity", about = "Router metrics", + context = "router") +public final class StateStoreMetrics implements StateStoreMBean { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + @Metric("GET transactions") + private MutableRate reads; + @Metric("PUT transactions") + private MutableRate writes; + @Metric("REMOVE transactions") + private MutableRate removes; + @Metric("Failed transactions") + private MutableRate failures; + + private Map<String, MutableGaugeInt> cacheSizes; + + private StateStoreMetrics(Configuration conf) { + registry.tag(SessionId, "RouterSession"); + registry.tag(ProcessName, "Router"); + cacheSizes = new HashMap<>(); + } + + public static StateStoreMetrics create(Configuration conf) { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register(new StateStoreMetrics(conf)); + } + + public void shutdown() { + DefaultMetricsSystem.shutdown(); + reset(); + } + + public void addRead(long latency) { + reads.add(latency); + } + + public long getReadOps() { + return reads.lastStat().numSamples(); + } + + public double getReadAvg() { + return reads.lastStat().mean(); + } + + public void addWrite(long latency) { + writes.add(latency); + } + + public long getWriteOps() { + return writes.lastStat().numSamples(); + } + + public double getWriteAvg() { + return writes.lastStat().mean(); + } + + public void addFailure(long latency) { + failures.add(latency); + } + + public long getFailureOps() { + return failures.lastStat().numSamples(); + } + + public double getFailureAvg() { + return failures.lastStat().mean(); + } + + public void addRemove(long latency) { + removes.add(latency); + } + + public long getRemoveOps() { + return removes.lastStat().numSamples(); + } + + public double getRemoveAvg() { + return removes.lastStat().mean(); + } + + /** + * Set the size of the cache for a State Store interface. + * + * @param name Name of the record to cache. + * @param size Number of records. + */ + public void setCacheSize(String name, int size) { + String counterName = "Cache" + name + "Size"; + MutableGaugeInt counter = cacheSizes.get(counterName); + if (counter == null) { + counter = registry.newGauge(counterName, name, size); + cacheSizes.put(counterName, counter); + } + counter.set(size); + } + + @VisibleForTesting + public void reset() { + reads.resetMinMax(); + writes.resetMinMax(); + removes.resetMinMax(); + failures.resetMinMax(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java new file mode 100644 index 0000000..c56c823 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/package-info.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Report metrics for Router-based Federation. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +package org.apache.hadoop.hdfs.server.federation.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index d93d498..543d964 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; +import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,6 +285,27 @@ public class ConnectionManager { } /** + * Get a JSON representation of the connection pool. + * + * @return JSON representation of all the connection pools. + */ + public String getJSON() { + final Map<String, String> info = new TreeMap<>(); + readLock.lock(); + try { + for (Entry<ConnectionPoolId, ConnectionPool> entry : + this.pools.entrySet()) { + ConnectionPoolId connectionPoolId = entry.getKey(); + ConnectionPool pool = entry.getValue(); + info.put(connectionPoolId.toString(), pool.getJSON()); + } + } finally { + readLock.unlock(); + } + return JSON.toString(info); + } + + /** * Removes stale connections not accessed recently from the pool. This is * invoked periodically. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index f76f621..ca113ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +48,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; +import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -258,6 +261,26 @@ public class ConnectionPool { } /** + * JSON representation of the connection pool. + * + * @return String representation of the JSON. + */ + public String getJSON() { + final Map<String, String> info = new LinkedHashMap<>(); + info.put("active", Integer.toString(getNumActiveConnections())); + info.put("total", Integer.toString(getNumConnections())); + if (LOG.isDebugEnabled()) { + List<ConnectionContext> tmpConnections = this.connections; + for (int i=0; i<tmpConnections.size(); i++) { + ConnectionContext connection = tmpConnections.get(i); + info.put(i + " active", Boolean.toString(connection.isActive())); + info.put(i + " closed", Boolean.toString(connection.isClosed())); + } + } + return JSON.toString(info); + } + + /** * Create a new proxy wrapper for a client NN connection. * @return Proxy for the target ClientProtocol that contains the user's * security context. http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index fcbd2eb..3ab5e2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -36,10 +36,14 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; @@ -96,6 +100,12 @@ public class Router extends CompositeService { /** Updates the namenode status in the namenode resolver. */ private Collection<NamenodeHeartbeatService> namenodeHearbeatServices; + /** Router metrics. */ + private RouterMetricsService metrics; + + /** JVM pauses (GC and others). */ + private JvmPauseMonitor pauseMonitor; + /** Usage string for help message. */ private static final String USAGE = "Usage: java Router"; @@ -174,18 +184,46 @@ public class Router extends CompositeService { } } + // Router metrics system + if (conf.getBoolean( + DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE, + DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { + + DefaultMetricsSystem.initialize("Router"); + + this.metrics = new RouterMetricsService(this); + addService(this.metrics); + + // JVM pause monitor + this.pauseMonitor = new JvmPauseMonitor(); + this.pauseMonitor.init(conf); + } + super.serviceInit(conf); } @Override protected void serviceStart() throws Exception { + if (this.pauseMonitor != null) { + this.pauseMonitor.start(); + JvmMetrics jvmMetrics = this.metrics.getJvmMetrics(); + if (jvmMetrics != null) { + jvmMetrics.setPauseMonitor(pauseMonitor); + } + } + super.serviceStart(); } @Override protected void serviceStop() throws Exception { + // JVM pause monitor + if (this.pauseMonitor != null) { + this.pauseMonitor.stop(); + } + super.serviceStop(); } @@ -419,6 +457,30 @@ public class Router extends CompositeService { } /** + * Get the metrics system for the Router. + * + * @return Router metrics. + */ + public RouterMetrics getRouterMetrics() { + if (this.metrics != null) { + return this.metrics.getRouterMetrics(); + } + return null; + } + + /** + * Get the federation metrics. + * + * @return Federation metrics. + */ + public FederationMetrics getMetrics() { + if (this.metrics != null) { + return this.metrics.getFederationMetrics(); + } + return null; + } + + /** * Get the subcluster resolver for files. * * @return Subcluster resolver for files. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org