http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java new file mode 100644 index 0000000..851538a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetrics.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; +import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; +import org.apache.hadoop.metrics2.source.JvmMetrics; + +/** + * This class is for maintaining the various Router activity statistics + * and publishing them through the metrics interfaces. + */ +@Metrics(name="RouterActivity", about="Router metrics", context="dfs") +public class RouterMetrics { + + private final MetricsRegistry registry = new MetricsRegistry("router"); + + @Metric("Duration in SafeMode at startup in msec") + private MutableGaugeInt safeModeTime; + + private JvmMetrics jvmMetrics = null; + + RouterMetrics( + String processName, String sessionId, final JvmMetrics jvmMetrics) { + this.jvmMetrics = jvmMetrics; + registry.tag(ProcessName, processName).tag(SessionId, sessionId); + } + + public static RouterMetrics create(Configuration conf) { + String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY); + String processName = "Router"; + MetricsSystem ms = DefaultMetricsSystem.instance(); + JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms); + + return ms.register(new RouterMetrics(processName, sessionId, jm)); + } + + public JvmMetrics getJvmMetrics() { + return jvmMetrics; + } + + public void shutdown() { + DefaultMetricsSystem.shutdown(); + } + + public void setSafeModeTime(long elapsed) { + safeModeTime.set((int) elapsed); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java new file mode 100644 index 0000000..f4debce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterMetricsService.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; +import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.AbstractService; + +/** + * Service to manage the metrics of the Router. + */ +public class RouterMetricsService extends AbstractService { + + /** Router for this metrics. */ + private final Router router; + + /** Router metrics. */ + private RouterMetrics routerMetrics; + /** Federation metrics. */ + private FederationMetrics federationMetrics; + /** Namenode mock metrics. */ + private NamenodeBeanMetrics nnMetrics; + + + public RouterMetricsService(final Router router) { + super(RouterMetricsService.class.getName()); + this.router = router; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.routerMetrics = RouterMetrics.create(configuration); + } + + @Override + protected void serviceStart() throws Exception { + // Wrapper for all the FSNamesystem JMX interfaces + this.nnMetrics = new NamenodeBeanMetrics(this.router); + + // Federation MBean JMX interface + this.federationMetrics = new FederationMetrics(this.router); + } + + @Override + protected void serviceStop() throws Exception { + // Remove JMX interfaces + if (this.federationMetrics != null) { + this.federationMetrics.close(); + } + + // Remove Namenode JMX interfaces + if (this.nnMetrics != null) { + this.nnMetrics.close(); + } + + // Shutdown metrics + if (this.routerMetrics != null) { + this.routerMetrics.shutdown(); + } + } + + /** + * Get the metrics system for the Router. + * + * @return Router metrics. + */ + public RouterMetrics getRouterMetrics() { + return this.routerMetrics; + } + + /** + * Get the federation metrics. + * + * @return Federation metrics. + */ + public FederationMetrics getFederationMetrics() { + return this.federationMetrics; + } + + /** + * Get the JVM metrics for the Router. + * + * @return JVM metrics. + */ + public JvmMetrics getJvmMetrics() { + if (this.routerMetrics == null) { + return null; + } + return this.routerMetrics.getJvmMetrics(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 3a32be1..5c33c2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -97,6 +97,8 @@ public class RouterRpcClient { private final ExecutorService executorService; /** Retry policy for router -> NN communication. */ private final RetryPolicy retryPolicy; + /** Optional perf monitor. */ + private final RouterRpcMonitor rpcMonitor; /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = @@ -111,8 +113,7 @@ public class RouterRpcClient { * @param monitor Optional performance monitor. */ public RouterRpcClient(Configuration conf, String identifier, - ActiveNamenodeResolver resolver) { - + ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { this.routerId = identifier; this.namenodeResolver = resolver; @@ -125,6 +126,8 @@ public class RouterRpcClient { .build(); this.executorService = Executors.newCachedThreadPool(threadFactory); + this.rpcMonitor = monitor; + int maxFailoverAttempts = conf.getInt( HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); @@ -192,6 +195,15 @@ public class RouterRpcClient { } /** + * JSON representation of the connection pool. + * + * @return String representation of the JSON. + */ + public String getJSON() { + return this.connectionManager.getJSON(); + } + + /** * Get ClientProtocol proxy client for a NameNode. Each combination of user + * NN must use a unique proxy client. Previously created clients are cached * and stored in a connection pool by the ConnectionManager. @@ -294,6 +306,9 @@ public class RouterRpcClient { } Object ret = null; + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } boolean failover = false; Map<FederationNamenodeContext, IOException> ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { @@ -310,18 +325,31 @@ public class RouterRpcClient { InetSocketAddress address = client.getAddress(); namenodeResolver.updateActiveNamenode(nsId, address); } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true); + } return ret; } catch (IOException ioe) { ioes.put(namenode, ioe); if (ioe instanceof StandbyException) { // Fail over indicated by retry policy and/or NN + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureStandby(); + } failover = true; } else if (ioe instanceof RemoteException) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(true); + } // RemoteException returned by NN throw (RemoteException) ioe; } else { // Other communication error, this is a failure // Communication retries are handled by the retry policy + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpFailureCommunicate(); + this.rpcMonitor.proxyOpComplete(false); + } throw ioe; } } finally { @@ -330,6 +358,9 @@ public class RouterRpcClient { } } } + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpComplete(false); + } // All namenodes were unavailable or in standby String msg = "No namenode available to invoke " + method.getName() + " " + @@ -746,6 +777,10 @@ public class RouterRpcClient { } } + if (rpcMonitor != null) { + rpcMonitor.proxyOp(); + } + try { List<Future<Object>> futures = executorService.invokeAll(callables); Map<T, Object> results = new TreeMap<>(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java new file mode 100644 index 0000000..d889a56 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; + +/** + * Metrics and monitoring interface for the router RPC server. Allows pluggable + * diagnostics and monitoring services to be attached. + */ +public interface RouterRpcMonitor { + + /** + * Initialize the monitor. + * @param conf Configuration for the monitor. + * @param server RPC server. + * @param store State Store. + */ + void init( + Configuration conf, RouterRpcServer server, StateStoreService store); + + /** + * Close the monitor. + */ + void close(); + + /** + * Start processing an operation on the Router. + */ + void startOp(); + + /** + * Start proxying an operation to the Namenode. + * @return Id of the thread doing the proxying. + */ + long proxyOp(); + + /** + * Mark a proxy operation as completed. + * @param success If the operation was successful. + */ + void proxyOpComplete(boolean success); + + /** + * Failed to proxy an operation to a Namenode because it was in standby. + */ + void proxyOpFailureStandby(); + + /** + * Failed to proxy an operation to a Namenode because of an unexpected + * exception. + */ + void proxyOpFailureCommunicate(); + + /** + * Failed to proxy an operation because it is not implemented. + */ + void proxyOpNotImplemented(); + + /** + * If the Router cannot contact the State Store in an operation. + */ + void routerFailureStateStore(); + + /** + * If the Router is in safe mode. + */ + void routerFailureSafemode(); + + /** + * If a path is locked. + */ + void routerFailureLocked(); + + /** + * If a path is in a read only mount point. + */ + void routerFailureReadOnly(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index f9b4a5d..6aee1ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -120,6 +120,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,6 +160,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { /** RPC clients to connect to the Namenodes. */ private final RouterRpcClient rpcClient; + /** Monitor metrics for the RPC calls. */ + private final RouterRpcMonitor rpcMonitor; + /** Interface to identify the active NN for a nameservice or blockpool ID. */ private final ActiveNamenodeResolver namenodeResolver; @@ -256,14 +260,28 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { this.rpcAddress = new InetSocketAddress( confRpcAddress.getHostName(), listenAddress.getPort()); + // Create metrics monitor + Class<? extends RouterRpcMonitor> rpcMonitorClass = this.conf.getClass( + DFSConfigKeys.DFS_ROUTER_METRICS_CLASS, + DFSConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT, + RouterRpcMonitor.class); + this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); + // Create the client this.rpcClient = new RouterRpcClient(this.conf, this.router.getRouterId(), - this.namenodeResolver); + this.namenodeResolver, this.rpcMonitor); } @Override protected void serviceInit(Configuration configuration) throws Exception { this.conf = configuration; + + if (this.rpcMonitor == null) { + LOG.error("Cannot instantiate Router RPC metrics class"); + } else { + this.rpcMonitor.init(this.conf, this, this.router.getStateStore()); + } + super.serviceInit(configuration); } @@ -281,6 +299,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { if (this.rpcServer != null) { this.rpcServer.stop(); } + if (rpcMonitor != null) { + this.rpcMonitor.close(); + } super.serviceStop(); } @@ -294,6 +315,15 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { } /** + * Get the RPC monitor and metrics. + * + * @return RPC monitor and metrics. + */ + public RouterRpcMonitor getRPCMonitor() { + return rpcMonitor; + } + + /** * Allow access to the client RPC server for testing. * * @return The RPC server. @@ -330,6 +360,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { checkOperation(op); if (!supported) { + if (rpcMonitor != null) { + rpcMonitor.proxyOpNotImplemented(); + } String methodName = getMethodName(); throw new UnsupportedOperationException( "Operation \"" + methodName + "\" is not supported"); @@ -346,6 +379,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { */ private void checkOperation(OperationCategory op) throws StandbyException { // Log the function we are currently calling. + if (rpcMonitor != null) { + rpcMonitor.startOp(); + } + // Log the function we are currently calling. if (LOG.isDebugEnabled()) { String methodName = getMethodName(); LOG.debug("Proxying operation: {}", methodName); @@ -1912,16 +1949,22 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { */ private List<RemoteLocation> getLocationsForPath( String path, boolean failIfLocked) throws IOException { - // Check the location for this path - final PathLocation location = - this.subclusterResolver.getDestinationForPath(path); - if (location == null) { - throw new IOException("Cannot find locations for " + path + " in " + - this.subclusterResolver); - } + try { + // Check the location for this path + final PathLocation location = + this.subclusterResolver.getDestinationForPath(path); + if (location == null) { + throw new IOException("Cannot find locations for " + path + " in " + + this.subclusterResolver); + } - // Log the access to a path - return location.getDestinations(); + return location.getDestinations(); + } catch (IOException ioe) { + if (this.rpcMonitor != null) { + this.rpcMonitor.routerFailureStateStore(); + } + throw ioe; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java index 90a6699..fbece88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult; @@ -140,6 +141,13 @@ public abstract class CachedRecordStore<R extends BaseRecord> writeLock.unlock(); } + // Update the metrics for the cache State Store size + StateStoreMetrics metrics = getDriver().getMetrics(); + if (metrics != null) { + String recordName = getRecordClass().getSimpleName(); + metrics.setCacheSize(recordName, this.records.size()); + } + lastUpdate = Time.monotonicNow(); } return true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java index 3aa3ffd..0289ba6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreService.java @@ -25,15 +25,23 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import javax.management.NotCompliantMBeanException; +import javax.management.ObjectName; +import javax.management.StandardMBean; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMBean; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; import org.apache.hadoop.hdfs.server.federation.store.impl.MembershipStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.impl.MountTableStoreImpl; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.metrics2.MetricsException; +import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Time; @@ -87,6 +95,8 @@ public class StateStoreService extends CompositeService { /** Service to maintain data store connection. */ private StateStoreConnectionMonitorService monitorService; + /** StateStore metrics. */ + private StateStoreMetrics metrics; /** Supported record stores. */ private final Map< @@ -152,6 +162,21 @@ public class StateStoreService extends CompositeService { this.cacheUpdater = new StateStoreCacheUpdateService(this); addService(this.cacheUpdater); + // Create metrics for the State Store + this.metrics = StateStoreMetrics.create(conf); + + // Adding JMX interface + try { + StandardMBean bean = new StandardMBean(metrics, StateStoreMBean.class); + ObjectName registeredObject = + MBeans.register("Router", "StateStore", bean); + LOG.info("Registered StateStoreMBean: {}", registeredObject); + } catch (NotCompliantMBeanException e) { + throw new RuntimeException("Bad StateStoreMBean setup", e); + } catch (MetricsException e) { + LOG.info("Failed to register State Store bean {}", e.getMessage()); + } + super.serviceInit(this.conf); } @@ -165,6 +190,11 @@ public class StateStoreService extends CompositeService { protected void serviceStop() throws Exception { closeDriver(); + if (metrics != null) { + metrics.shutdown(); + metrics = null; + } + super.serviceStop(); } @@ -228,7 +258,8 @@ public class StateStoreService extends CompositeService { synchronized (this.driver) { if (!isDriverReady()) { String driverName = this.driver.getClass().getSimpleName(); - if (this.driver.init(conf, getIdentifier(), getSupportedRecords())) { + if (this.driver.init( + conf, getIdentifier(), getSupportedRecords(), metrics)) { LOG.info("Connection to the State Store driver {} is open and ready", driverName); this.refreshCaches(); @@ -398,4 +429,13 @@ public class StateStoreService extends CompositeService { throw new IOException("Registered cache was not found for " + clazz); } + /** + * Get the metrics for the State Store. + * + * @return State Store metrics. + */ + public StateStoreMetrics getMetrics() { + return metrics; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java index 90111bf..3ebab0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/StateStoreDriver.java @@ -21,6 +21,7 @@ import java.net.InetAddress; import java.util.Collection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException; import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils; @@ -46,6 +47,9 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { /** Identifier for the driver. */ private String identifier; + /** State Store metrics. */ + private StateStoreMetrics metrics; + /** * Initialize the state store connection. @@ -56,10 +60,12 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { * @return If initialized and ready, false if failed to initialize driver. */ public boolean init(final Configuration config, final String id, - final Collection<Class<? extends BaseRecord>> records) { + final Collection<Class<? extends BaseRecord>> records, + final StateStoreMetrics stateStoreMetrics) { this.conf = config; this.identifier = id; + this.metrics = stateStoreMetrics; if (this.identifier == null) { LOG.warn("The identifier for the State Store connection is not set"); @@ -101,6 +107,15 @@ public abstract class StateStoreDriver implements StateStoreRecordOperations { } /** + * Get the metrics for the State Store. + * + * @return State Store metrics. + */ + public StateStoreMetrics getMetrics() { + return this.metrics; + } + + /** * Prepare the driver to access data storage. * * @return True if the driver was successfully initialized. If false is http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java index e2038fa..7bc93de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreSerializableImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord; @@ -41,8 +42,9 @@ public abstract class StateStoreSerializableImpl extends StateStoreBaseImpl { @Override public boolean init(final Configuration config, final String id, - final Collection<Class<? extends BaseRecord>> records) { - boolean ret = super.init(config, id, records); + final Collection<Class<? extends BaseRecord>> records, + final StateStoreMetrics metrics) { + boolean ret = super.init(config, id, records, metrics); this.serializer = StateStoreSerializer.getSerializer(config); http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java index ddcd537..97c821e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.federation.store.driver.impl; import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple; import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName; import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath; +import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; @@ -123,6 +124,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz, String sub) throws IOException { verifyDriverReady(); + long start = monotonicNow(); List<T> ret = new ArrayList<>(); String znode = getZNodeForClass(clazz); try { @@ -157,11 +159,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } } } catch (Exception e) { + getMetrics().addFailure(monotonicNow() - start); String msg = "Cannot get children for \"" + znode + "\": " + e.getMessage(); LOG.error(msg); throw new IOException(msg); } + long end = monotonicNow(); + getMetrics().addRead(end - start); return new QueryResult<T>(ret, getTime()); } @@ -178,6 +183,7 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { Class<? extends BaseRecord> recordClass = record0.getClass(); String znode = getZNodeForClass(recordClass); + long start = monotonicNow(); boolean status = true; for (T record : records) { String primaryKey = getPrimaryKey(record); @@ -187,6 +193,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { status = false; } } + long end = monotonicNow(); + if (status) { + getMetrics().addWrite(end - start); + } else { + getMetrics().addFailure(end - start); + } return status; } @@ -199,12 +211,14 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } // Read the current data + long start = monotonicNow(); List<T> records = null; try { QueryResult<T> result = get(clazz); records = result.getRecords(); } catch (IOException ex) { LOG.error("Cannot get existing records", ex); + getMetrics().addFailure(monotonicNow() - start); return 0; } @@ -226,14 +240,20 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { } } catch (Exception e) { LOG.error("Cannot remove \"{}\"", existingRecord, e); + getMetrics().addFailure(monotonicNow() - start); } } + long end = monotonicNow(); + if (removed > 0) { + getMetrics().addRemove(end - start); + } return removed; } @Override public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException { + long start = monotonicNow(); boolean status = true; String znode = getZNodeForClass(clazz); LOG.info("Deleting all children under {}", znode); @@ -248,6 +268,12 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl { LOG.error("Cannot remove {}: {}", znode, e.getMessage()); status = false; } + long time = monotonicNow() - start; + if (status) { + getMetrics().addRemove(time); + } else { + getMetrics().addFailure(time); + } return status; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java index ab0ff0a..ac0b22e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MembershipState.java @@ -154,7 +154,7 @@ public abstract class MembershipState extends BaseRecord public abstract void setStats(MembershipStats stats); - public abstract MembershipStats getStats() throws IOException; + public abstract MembershipStats getStats(); public abstract void setLastContact(long contact); http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java index 16f2b8b..0a3f19d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.store.records; import java.io.IOException; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -46,6 +47,28 @@ public abstract class MountTable extends BaseRecord { private static final Logger LOG = LoggerFactory.getLogger(MountTable.class); + /** Comparator for paths which considers the /. */ + public static final Comparator<String> PATH_COMPARATOR = + new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + String s1 = o1.replace('/', ' '); + String s2 = o2.replace('/', ' '); + return s1.compareTo(s2); + } + }; + + /** Comparator based on the mount table source. */ + public static final Comparator<MountTable> SOURCE_COMPARATOR = + new Comparator<MountTable>() { + public int compare(MountTable m1, MountTable m2) { + String src1 = m1.getSourcePath(); + String src2 = m2.getSourcePath(); + return PATH_COMPARATOR.compare(src1, src2); + } + }; + + /** * Default constructor for a mount table entry. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java index 805c2af..614957b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MembershipStatePBImpl.java @@ -288,7 +288,7 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord { } @Override - public MembershipStats getStats() throws IOException { + public MembershipStats getStats() { NamenodeMembershipStatsRecordProto statsProto = this.translator.getProtoOrBuilder().getStats(); MembershipStats stats = @@ -298,7 +298,8 @@ public class MembershipStatePBImpl extends MembershipState implements PBRecord { statsPB.setProto(statsProto); return statsPB; } else { - throw new IOException("Cannot get stats for the membership"); + throw new IllegalArgumentException( + "Cannot get stats for the membership"); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 151f00c..550e5df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4449,7 +4449,24 @@ </description> </property> - <property> + <property> + <name>dfs.federation.router.metrics.enable</name> + <value>true</value> + <description> + If the metrics in the router are enabled. + </description> + </property> + + <property> + <name>dfs.federation.router.metrics.class</name> + <value>org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor</value> + <description> + Class to monitor the RPC system in the router. It must implement the + RouterRpcMonitor interface. + </description> + </property> + + <property> <name>dfs.federation.router.admin.enable</name> <value>true</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index bbb548ca..0c01763 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -26,12 +26,18 @@ import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; +import java.lang.management.ManagementFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Date; import java.util.List; import java.util.Random; +import javax.management.JMX; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -135,6 +141,13 @@ public final class FederationTestUtils { return Math.abs(d1.getTime() - d2.getTime()) < precision; } + public static <T> T getBean(String name, Class<T> obj) + throws MalformedObjectNameException { + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ObjectName poolName = new ObjectName(name); + return JMX.newMXBeanProxy(mBeanServer, poolName, obj); + } + public static boolean addDirectory(FileSystem context, String path) throws IOException { context.mkdirs(new Path(path), new FsPermission("777")); http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java index cac5e6b..58ca1d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java @@ -32,6 +32,7 @@ public class RouterConfigBuilder { private boolean enableHeartbeat = false; private boolean enableLocalHeartbeat = false; private boolean enableStateStore = false; + private boolean enableMetrics = false; public RouterConfigBuilder(Configuration configuration) { this.conf = configuration; @@ -47,6 +48,7 @@ public class RouterConfigBuilder { this.enableHeartbeat = true; this.enableLocalHeartbeat = true; this.enableStateStore = true; + this.enableMetrics = true; return this; } @@ -75,6 +77,11 @@ public class RouterConfigBuilder { return this; } + public RouterConfigBuilder metrics(boolean enable) { + this.enableMetrics = enable; + return this; + } + public RouterConfigBuilder rpc() { return this.rpc(true); } @@ -91,6 +98,10 @@ public class RouterConfigBuilder { return this.stateStore(true); } + public RouterConfigBuilder metrics() { + return this.metrics(true); + } + public Configuration build() { conf.setBoolean(DFSConfigKeys.DFS_ROUTER_STORE_ENABLE, this.enableStateStore); @@ -101,6 +112,8 @@ public class RouterConfigBuilder { this.enableHeartbeat); conf.setBoolean(DFSConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, this.enableLocalHeartbeat); + conf.setBoolean(DFSConfigKeys.DFS_ROUTER_METRICS_ENABLE, + this.enableMetrics); return conf; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java new file mode 100644 index 0000000..d6a194f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestFederationMetrics.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getBean; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import javax.management.MalformedObjectNameException; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.junit.Test; + +/** + * Test the JMX interface for the {@link Router}. + */ +public class TestFederationMetrics extends TestMetricsBase { + + public static final String FEDERATION_BEAN = + "Hadoop:service=Router,name=FederationState"; + public static final String STATE_STORE_BEAN = + "Hadoop:service=Router,name=StateStore"; + public static final String RPC_BEAN = + "Hadoop:service=Router,name=FederationRPC"; + + @Test + public void testClusterStatsJMX() + throws MalformedObjectNameException, IOException { + + FederationMBean bean = getBean(FEDERATION_BEAN, FederationMBean.class); + validateClusterStatsBean(bean); + } + + @Test + public void testClusterStatsDataSource() throws IOException { + FederationMetrics metrics = getRouter().getMetrics(); + validateClusterStatsBean(metrics); + } + + @Test + public void testMountTableStatsDataSource() + throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getMountTable(); + JSONArray jsonArray = new JSONArray(jsonString); + assertEquals(jsonArray.length(), getMockMountTable().size()); + + int match = 0; + for (int i = 0; i < jsonArray.length(); i++) { + JSONObject json = jsonArray.getJSONObject(i); + String src = json.getString("sourcePath"); + + for (MountTable entry : getMockMountTable()) { + if (entry.getSourcePath().equals(src)) { + assertEquals(entry.getDefaultLocation().getNameserviceId(), + json.getString("nameserviceId")); + assertEquals(entry.getDefaultLocation().getDest(), + json.getString("path")); + assertNotNullAndNotEmpty(json.getString("dateCreated")); + assertNotNullAndNotEmpty(json.getString("dateModified")); + match++; + } + } + } + assertEquals(match, getMockMountTable().size()); + } + + private MembershipState findMockNamenode(String nsId, String nnId) { + + @SuppressWarnings("unchecked") + List<MembershipState> namenodes = + ListUtils.union(getActiveMemberships(), getStandbyMemberships()); + for (MembershipState nn : namenodes) { + if (nn.getNamenodeId().equals(nnId) + && nn.getNameserviceId().equals(nsId)) { + return nn; + } + } + return null; + } + + @Test + public void testNamenodeStatsDataSource() throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getNamenodes(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator<?> keys = jsonObject.keys(); + int nnsFound = 0; + while (keys.hasNext()) { + // Validate each entry against our mocks + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String nameserviceId = json.getString("nameserviceId"); + String namenodeId = json.getString("namenodeId"); + + MembershipState mockEntry = + this.findMockNamenode(nameserviceId, namenodeId); + assertNotNull(mockEntry); + + assertEquals(json.getString("state"), mockEntry.getState().toString()); + MembershipStats stats = mockEntry.getStats(); + assertEquals(json.getLong("numOfActiveDatanodes"), + stats.getNumOfActiveDatanodes()); + assertEquals(json.getLong("numOfDeadDatanodes"), + stats.getNumOfDeadDatanodes()); + assertEquals(json.getLong("numOfDecommissioningDatanodes"), + stats.getNumOfDecommissioningDatanodes()); + assertEquals(json.getLong("numOfDecomActiveDatanodes"), + stats.getNumOfDecomActiveDatanodes()); + assertEquals(json.getLong("numOfDecomDeadDatanodes"), + stats.getNumOfDecomDeadDatanodes()); + assertEquals(json.getLong("numOfBlocks"), stats.getNumOfBlocks()); + assertEquals(json.getString("rpcAddress"), mockEntry.getRpcAddress()); + assertEquals(json.getString("webAddress"), mockEntry.getWebAddress()); + nnsFound++; + } + // Validate all memberships are present + assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(), + nnsFound); + } + + @Test + public void testNameserviceStatsDataSource() + throws IOException, JSONException { + + FederationMetrics metrics = getRouter().getMetrics(); + String jsonString = metrics.getNameservices(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator<?> keys = jsonObject.keys(); + int nameservicesFound = 0; + while (keys.hasNext()) { + JSONObject json = jsonObject.getJSONObject((String) keys.next()); + String nameserviceId = json.getString("nameserviceId"); + String namenodeId = json.getString("namenodeId"); + + MembershipState mockEntry = + this.findMockNamenode(nameserviceId, namenodeId); + assertNotNull(mockEntry); + + // NS should report the active NN + assertEquals(mockEntry.getState().toString(), json.getString("state")); + assertEquals("ACTIVE", json.getString("state")); + + // Stats in the NS should reflect the stats for the most active NN + MembershipStats stats = mockEntry.getStats(); + assertEquals(stats.getNumOfFiles(), json.getLong("numOfFiles")); + assertEquals(stats.getTotalSpace(), json.getLong("totalSpace")); + assertEquals(stats.getAvailableSpace(), + json.getLong("availableSpace")); + assertEquals(stats.getNumOfBlocksMissing(), + json.getLong("numOfBlocksMissing")); + assertEquals(stats.getNumOfActiveDatanodes(), + json.getLong("numOfActiveDatanodes")); + assertEquals(stats.getNumOfDeadDatanodes(), + json.getLong("numOfDeadDatanodes")); + assertEquals(stats.getNumOfDecommissioningDatanodes(), + json.getLong("numOfDecommissioningDatanodes")); + assertEquals(stats.getNumOfDecomActiveDatanodes(), + json.getLong("numOfDecomActiveDatanodes")); + assertEquals(stats.getNumOfDecomDeadDatanodes(), + json.getLong("numOfDecomDeadDatanodes")); + nameservicesFound++; + } + assertEquals(getNameservices().size(), nameservicesFound); + } + + private void assertNotNullAndNotEmpty(String field) { + assertNotNull(field); + assertTrue(field.length() > 0); + } + + private void validateClusterStatsBean(FederationMBean bean) + throws IOException { + + // Determine aggregates + long numBlocks = 0; + long numLive = 0; + long numDead = 0; + long numDecom = 0; + long numDecomLive = 0; + long numDecomDead = 0; + long numFiles = 0; + for (MembershipState mock : getActiveMemberships()) { + MembershipStats stats = mock.getStats(); + numBlocks += stats.getNumOfBlocks(); + numLive += stats.getNumOfActiveDatanodes(); + numDead += stats.getNumOfDeadDatanodes(); + numDecom += stats.getNumOfDecommissioningDatanodes(); + numDecomLive += stats.getNumOfDecomActiveDatanodes(); + numDecomDead += stats.getNumOfDecomDeadDatanodes(); + } + + assertEquals(numBlocks, bean.getNumBlocks()); + assertEquals(numLive, bean.getNumLiveNodes()); + assertEquals(numDead, bean.getNumDeadNodes()); + assertEquals(numDecom, bean.getNumDecommissioningNodes()); + assertEquals(numDecomLive, bean.getNumDecomLiveNodes()); + assertEquals(numDecomDead, bean.getNumDecomDeadNodes()); + assertEquals(numFiles, bean.getNumFiles()); + assertEquals(getActiveMemberships().size() + getStandbyMemberships().size(), + bean.getNumNamenodes()); + assertEquals(getNameservices().size(), bean.getNumNameservices()); + assertTrue(bean.getVersion().length() > 0); + assertTrue(bean.getCompiledDate().length() > 0); + assertTrue(bean.getCompileInfo().length() > 0); + assertTrue(bean.getRouterStarted().length() > 0); + assertTrue(bean.getHostAndPort().length() > 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java new file mode 100644 index 0000000..bbcfbe8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/metrics/TestMetricsBase.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.metrics; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMESERVICES; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.clearAllRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockMountTable; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.createMockRegistrationForNamenode; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.synchronizeRecords; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.waitStateStore; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.NamenodeHeartbeatRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.junit.After; +import org.junit.Before; + +/** + * Test the basic metrics functionality. + */ +public class TestMetricsBase { + + private StateStoreService stateStore; + private MembershipStore membershipStore; + private Router router; + private Configuration routerConfig; + + private List<MembershipState> activeMemberships; + private List<MembershipState> standbyMemberships; + private List<MountTable> mockMountTable; + private List<String> nameservices; + + @Before + public void setupBase() throws Exception { + + if (router == null) { + routerConfig = new RouterConfigBuilder() + .stateStore() + .metrics() + .build(); + router = new Router(); + router.init(routerConfig); + router.setRouterId("routerId"); + router.start(); + stateStore = router.getStateStore(); + + membershipStore = + stateStore.getRegisteredRecordStore(MembershipStore.class); + + // Read all data and load all caches + waitStateStore(stateStore, 10000); + createFixtures(); + stateStore.refreshCaches(true); + Thread.sleep(1000); + } + } + + @After + public void tearDownBase() throws IOException { + if (router != null) { + router.stop(); + router.close(); + router = null; + } + } + + private void createFixtures() throws IOException { + // Clear all records + clearAllRecords(stateStore); + + nameservices = new ArrayList<>(); + nameservices.add(NAMESERVICES[0]); + nameservices.add(NAMESERVICES[1]); + + // 2 NNs per NS + activeMemberships = new ArrayList<>(); + standbyMemberships = new ArrayList<>(); + + for (String nameservice : nameservices) { + MembershipState namenode1 = createMockRegistrationForNamenode( + nameservice, NAMENODES[0], FederationNamenodeServiceState.ACTIVE); + NamenodeHeartbeatRequest request1 = + NamenodeHeartbeatRequest.newInstance(namenode1); + assertTrue(membershipStore.namenodeHeartbeat(request1).getResult()); + activeMemberships.add(namenode1); + + MembershipState namenode2 = createMockRegistrationForNamenode( + nameservice, NAMENODES[1], FederationNamenodeServiceState.STANDBY); + NamenodeHeartbeatRequest request2 = + NamenodeHeartbeatRequest.newInstance(namenode2); + assertTrue(membershipStore.namenodeHeartbeat(request2).getResult()); + standbyMemberships.add(namenode2); + } + + // Add 2 mount table memberships + mockMountTable = createMockMountTable(nameservices); + synchronizeRecords(stateStore, mockMountTable, MountTable.class); + } + + protected Router getRouter() { + return router; + } + + protected List<MountTable> getMockMountTable() { + return mockMountTable; + } + + protected List<MembershipState> getActiveMemberships() { + return activeMemberships; + } + + protected List<MembershipState> getStandbyMemberships() { + return standbyMemberships; + } + + protected List<String> getNameservices() { + return nameservices; + } + + protected StateStoreService getStateStore() { + return stateStore; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java index 2074d3d..e22be84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java @@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.server.federation.MockResolver; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.service.Service.STATE; import org.junit.After; import org.junit.AfterClass; @@ -46,15 +48,19 @@ public class TestRouter { public static void create() throws IOException { // Basic configuration without the state store conf = new Configuration(); + // 1 sec cache refresh + conf.setInt(DFSConfigKeys.DFS_ROUTER_CACHE_TIME_TO_LIVE_MS, 1); // Mock resolver classes - conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); - conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, - MockResolver.class.getCanonicalName()); + conf.setClass(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MockResolver.class, ActiveNamenodeResolver.class); + conf.setClass(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MockResolver.class, FileSubclusterResolver.class); // Bind to any available port conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0"); conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "127.0.0.1:0"); + conf.set(DFSConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, "0.0.0.0"); // Simulate a co-located NN conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0"); @@ -95,9 +101,18 @@ public class TestRouter { @Test public void testRouterService() throws InterruptedException, IOException { + // Admin only + testRouterStartup(new RouterConfigBuilder(conf).admin().build()); + // Rpc only testRouterStartup(new RouterConfigBuilder(conf).rpc().build()); + // Metrics only + testRouterStartup(new RouterConfigBuilder(conf).metrics().build()); + + // Statestore only + testRouterStartup(new RouterConfigBuilder(conf).stateStore().build()); + // Heartbeat only testRouterStartup(new RouterConfigBuilder(conf).heartbeat().build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/673f6856/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java index 65e763b..01fe149 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/store/driver/TestStateStoreDriverBase.java @@ -35,6 +35,7 @@ import java.util.Map.Entry; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; @@ -377,6 +378,74 @@ public class TestStateStoreDriverBase { testFetchErrors(driver, MountTable.class); } + public void testMetrics(StateStoreDriver driver) + throws IOException, IllegalArgumentException, IllegalAccessException { + + MountTable insertRecord = + this.generateFakeRecord(MountTable.class); + + // Put single + StateStoreMetrics metrics = stateStore.getMetrics(); + assertEquals(0, metrics.getWriteOps()); + driver.put(insertRecord, true, false); + assertEquals(1, metrics.getWriteOps()); + + // Put multiple + metrics.reset(); + assertEquals(0, metrics.getWriteOps()); + driver.put(insertRecord, true, false); + assertEquals(1, metrics.getWriteOps()); + + // Get Single + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + + final String querySourcePath = insertRecord.getSourcePath(); + MountTable partial = MountTable.newInstance(); + partial.setSourcePath(querySourcePath); + final Query<MountTable> query = new Query<>(partial); + driver.get(MountTable.class, query); + assertEquals(1, metrics.getReadOps()); + + // GetAll + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + driver.get(MountTable.class); + assertEquals(1, metrics.getReadOps()); + + // GetMultiple + metrics.reset(); + assertEquals(0, metrics.getReadOps()); + driver.getMultiple(MountTable.class, query); + assertEquals(1, metrics.getReadOps()); + + // Insert fails + metrics.reset(); + assertEquals(0, metrics.getFailureOps()); + driver.put(insertRecord, false, true); + assertEquals(1, metrics.getFailureOps()); + + // Remove single + metrics.reset(); + assertEquals(0, metrics.getRemoveOps()); + driver.remove(insertRecord); + assertEquals(1, metrics.getRemoveOps()); + + // Remove multiple + metrics.reset(); + driver.put(insertRecord, true, false); + assertEquals(0, metrics.getRemoveOps()); + driver.remove(MountTable.class, query); + assertEquals(1, metrics.getRemoveOps()); + + // Remove all + metrics.reset(); + driver.put(insertRecord, true, false); + assertEquals(0, metrics.getRemoveOps()); + driver.removeAll(MountTable.class); + assertEquals(1, metrics.getRemoveOps()); + } + /** * Sets the value of a field on the object. * --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org