http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java new file mode 100644 index 0000000..a1adf77 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/NamenodeHeartbeatService.java @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.tools.NNHAServiceTarget; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link Router} periodically checks the state of a Namenode (usually on + * the same server) and reports their high availability (HA) state and + * load/space status to the + * {@link org.apache.hadoop.hdfs.server.federation.store.StateStoreService} + * . Note that this is an optional role as a Router can be independent of any + * subcluster. + * <p> + * For performance with Namenode HA, the Router uses the high availability state + * information in the State Store to forward the request to the Namenode that is + * most likely to be active. + * <p> + * Note that this service can be embedded into the Namenode itself to simplify + * the operation. + */ +public class NamenodeHeartbeatService extends PeriodicService { + + private static final Logger LOG = + LoggerFactory.getLogger(NamenodeHeartbeatService.class); + + + /** Configuration for the heartbeat. */ + private Configuration conf; + + /** Router performing the heartbeating. */ + private final ActiveNamenodeResolver resolver; + + /** Interface to the tracked NN. */ + private final String nameserviceId; + private final String namenodeId; + + /** Namenode HA target. */ + private NNHAServiceTarget localTarget; + /** RPC address for the namenode. */ + private String rpcAddress; + /** Service RPC address for the namenode. */ + private String serviceAddress; + /** Service RPC address for the namenode. */ + private String lifelineAddress; + /** HTTP address for the namenode. */ + private String webAddress; + + /** + * Create a new Namenode status updater. + * @param resolver Namenode resolver service to handle NN registration. + * @param nsId Identifier of the nameservice. + * @param nnId Identifier of the namenode in HA. + */ + public NamenodeHeartbeatService( + ActiveNamenodeResolver resolver, String nsId, String nnId) { + super(NamenodeHeartbeatService.class.getSimpleName() + + (nsId == null ? "" : " " + nsId) + + (nnId == null ? "" : " " + nnId)); + + this.resolver = resolver; + + this.nameserviceId = nsId; + this.namenodeId = nnId; + + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + + this.conf = configuration; + + String nnDesc = nameserviceId; + if (this.namenodeId != null && !this.namenodeId.isEmpty()) { + this.localTarget = new NNHAServiceTarget( + conf, nameserviceId, namenodeId); + nnDesc += "-" + namenodeId; + } else { + this.localTarget = null; + } + + // Get the RPC address for the clients to connect + this.rpcAddress = getRpcAddress(conf, nameserviceId, namenodeId); + LOG.info("{} RPC address: {}", nnDesc, rpcAddress); + + // Get the Service RPC address for monitoring + this.serviceAddress = + DFSUtil.getNamenodeServiceAddr(conf, nameserviceId, namenodeId); + if (this.serviceAddress == null) { + LOG.error("Cannot locate RPC service address for NN {}, " + + "using RPC address {}", nnDesc, this.rpcAddress); + this.serviceAddress = this.rpcAddress; + } + LOG.info("{} Service RPC address: {}", nnDesc, serviceAddress); + + // Get the Lifeline RPC address for faster monitoring + this.lifelineAddress = + DFSUtil.getNamenodeLifelineAddr(conf, nameserviceId, namenodeId); + if (this.lifelineAddress == null) { + this.lifelineAddress = this.serviceAddress; + } + LOG.info("{} Lifeline RPC address: {}", nnDesc, lifelineAddress); + + // Get the Web address for UI + this.webAddress = + DFSUtil.getNamenodeWebAddr(conf, nameserviceId, namenodeId); + LOG.info("{} Web address: {}", nnDesc, webAddress); + + this.setIntervalMs(conf.getLong( + DFS_ROUTER_HEARTBEAT_INTERVAL_MS, + DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT)); + + + super.serviceInit(configuration); + } + + @Override + public void periodicInvoke() { + updateState(); + } + + /** + * Get the RPC address for a Namenode. + * @param conf Configuration. + * @param nsId Name service identifier. + * @param nnId Name node identifier. + * @return RPC address in format hostname:1234. + */ + private static String getRpcAddress( + Configuration conf, String nsId, String nnId) { + + // Get it from the regular RPC setting + String confKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; + String ret = conf.get(confKey); + + if (nsId != null || nnId != null) { + // Get if for the proper nameservice and namenode + confKey = DFSUtil.addKeySuffixes(confKey, nsId, nnId); + ret = conf.get(confKey); + + // If not available, get it from the map + if (ret == null) { + Map<String, InetSocketAddress> rpcAddresses = + DFSUtil.getRpcAddressesForNameserviceId(conf, nsId, null); + InetSocketAddress sockAddr = null; + if (nnId != null) { + sockAddr = rpcAddresses.get(nnId); + } else if (rpcAddresses.size() == 1) { + // Get the only namenode in the namespace + sockAddr = rpcAddresses.values().iterator().next(); + } + if (sockAddr != null) { + InetAddress addr = sockAddr.getAddress(); + ret = addr.getHostName() + ":" + sockAddr.getPort(); + } + } + } + return ret; + } + + /** + * Update the state of the Namenode. + */ + private void updateState() { + NamenodeStatusReport report = getNamenodeStatusReport(); + if (!report.registrationValid()) { + // Not operational + LOG.error("Namenode is not operational: {}", getNamenodeDesc()); + } else if (report.haStateValid()) { + // block and HA status available + LOG.debug("Received service state: {} from HA namenode: {}", + report.getState(), getNamenodeDesc()); + } else if (localTarget == null) { + // block info available, HA status not expected + LOG.debug( + "Reporting non-HA namenode as operational: " + getNamenodeDesc()); + } else { + // block info available, HA status should be available, but was not + // fetched do nothing and let the current state stand + return; + } + try { + if (!resolver.registerNamenode(report)) { + LOG.warn("Cannot register namenode {}", report); + } + } catch (IOException e) { + LOG.info("Cannot register namenode in the State Store"); + } catch (Exception ex) { + LOG.error("Unhandled exception updating NN registration for {}", + getNamenodeDesc(), ex); + } + } + + /** + * Get the status report for the Namenode monitored by this heartbeater. + * @return Namenode status report. + */ + protected NamenodeStatusReport getNamenodeStatusReport() { + NamenodeStatusReport report = new NamenodeStatusReport(nameserviceId, + namenodeId, rpcAddress, serviceAddress, lifelineAddress, webAddress); + + try { + LOG.debug("Probing NN at service address: {}", serviceAddress); + + URI serviceURI = new URI("hdfs://" + serviceAddress); + // Read the filesystem info from RPC (required) + NamenodeProtocol nn = NameNodeProxies + .createProxy(this.conf, serviceURI, NamenodeProtocol.class) + .getProxy(); + + if (nn != null) { + NamespaceInfo info = nn.versionRequest(); + if (info != null) { + report.setNamespaceInfo(info); + } + } + if (!report.registrationValid()) { + return report; + } + + // Check for safemode from the client protocol. Currently optional, but + // should be required at some point for QoS + try { + ClientProtocol client = NameNodeProxies + .createProxy(this.conf, serviceURI, ClientProtocol.class) + .getProxy(); + if (client != null) { + boolean isSafeMode = client.setSafeMode( + SafeModeAction.SAFEMODE_GET, false); + report.setSafeMode(isSafeMode); + } + } catch (Exception e) { + LOG.error("Cannot fetch safemode state for {}", getNamenodeDesc(), e); + } + + // Read the stats from JMX (optional) + updateJMXParameters(webAddress, report); + + if (localTarget != null) { + // Try to get the HA status + try { + // Determine if NN is active + // TODO: dynamic timeout + HAServiceProtocol haProtocol = localTarget.getProxy(conf, 30*1000); + HAServiceStatus status = haProtocol.getServiceStatus(); + report.setHAServiceState(status.getState()); + } catch (Throwable e) { + if (e.getMessage().startsWith("HA for namenode is not enabled")) { + LOG.error("HA for {} is not enabled", getNamenodeDesc()); + localTarget = null; + } else { + // Failed to fetch HA status, ignoring failure + LOG.error("Cannot fetch HA status for {}: {}", + getNamenodeDesc(), e.getMessage(), e); + } + } + } + } catch(IOException e) { + LOG.error("Cannot communicate with {}: {}", + getNamenodeDesc(), e.getMessage()); + } catch(Throwable e) { + // Generic error that we don't know about + LOG.error("Unexpected exception while communicating with {}: {}", + getNamenodeDesc(), e.getMessage(), e); + } + return report; + } + + /** + * Get the description of the Namenode to monitor. + * @return Description of the Namenode to monitor. + */ + public String getNamenodeDesc() { + if (namenodeId != null && !namenodeId.isEmpty()) { + return nameserviceId + "-" + namenodeId + ":" + serviceAddress; + } else { + return nameserviceId + ":" + serviceAddress; + } + } + + /** + * Get the parameters for a Namenode from JMX and add them to the report. + * @param address Web interface of the Namenode to monitor. + * @param report Namenode status report to update with JMX data. + */ + private void updateJMXParameters( + String address, NamenodeStatusReport report) { + try { + // TODO part of this should be moved to its own utility + String query = "Hadoop:service=NameNode,name=FSNamesystem*"; + JSONArray aux = FederationUtil.getJmx(query, address); + if (aux != null) { + for (int i = 0; i < aux.length(); i++) { + JSONObject jsonObject = aux.getJSONObject(i); + String name = jsonObject.getString("name"); + if (name.equals("Hadoop:service=NameNode,name=FSNamesystemState")) { + report.setDatanodeInfo( + jsonObject.getInt("NumLiveDataNodes"), + jsonObject.getInt("NumDeadDataNodes"), + jsonObject.getInt("NumDecommissioningDataNodes"), + jsonObject.getInt("NumDecomLiveDataNodes"), + jsonObject.getInt("NumDecomDeadDataNodes")); + } else if (name.equals( + "Hadoop:service=NameNode,name=FSNamesystem")) { + report.setNamesystemInfo( + jsonObject.getLong("CapacityRemaining"), + jsonObject.getLong("CapacityTotal"), + jsonObject.getLong("FilesTotal"), + jsonObject.getLong("BlocksTotal"), + jsonObject.getLong("MissingBlocks"), + jsonObject.getLong("PendingReplicationBlocks"), + jsonObject.getLong("UnderReplicatedBlocks"), + jsonObject.getLong("PendingDeletionBlocks"), + jsonObject.getLong("ProvidedCapacityTotal")); + } + } + } + } catch (Exception e) { + LOG.error("Cannot get stat from {} using JMX", getNamenodeDesc(), e); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java new file mode 100644 index 0000000..5e12222 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PeriodicService.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Service to periodically execute a runnable. + */ +public abstract class PeriodicService extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(PeriodicService.class); + + /** Default interval in milliseconds for the periodic service. */ + private static final long DEFAULT_INTERVAL_MS = TimeUnit.MINUTES.toMillis(1); + + + /** Interval for running the periodic service in milliseconds. */ + private long intervalMs; + /** Name of the service. */ + private final String serviceName; + + /** Scheduler for the periodic service. */ + private final ScheduledExecutorService scheduler; + + /** If the service is running. */ + private volatile boolean isRunning = false; + + /** How many times we run. */ + private long runCount; + /** How many errors we got. */ + private long errorCount; + /** When was the last time we executed this service successfully. */ + private long lastRun; + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + */ + public PeriodicService(String name) { + this(name, DEFAULT_INTERVAL_MS); + } + + /** + * Create a new periodic update service. + * + * @param name Name of the service. + * @param interval Interval for the periodic service in milliseconds. + */ + public PeriodicService(String name, long interval) { + super(name); + this.serviceName = name; + this.intervalMs = interval; + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat(this.getName() + "-%d") + .build(); + this.scheduler = Executors.newScheduledThreadPool(1, threadFactory); + } + + /** + * Set the interval for the periodic service. + * + * @param interval Interval in milliseconds. + */ + protected void setIntervalMs(long interval) { + if (getServiceState() == STATE.STARTED) { + throw new ServiceStateException("Periodic service already started"); + } else { + this.intervalMs = interval; + } + } + + /** + * Get the interval for the periodic service. + * + * @return Interval in milliseconds. + */ + protected long getIntervalMs() { + return this.intervalMs; + } + + /** + * Get how many times we failed to run the periodic service. + * + * @return Times we failed to run the periodic service. + */ + protected long getErrorCount() { + return this.errorCount; + } + + /** + * Get how many times we run the periodic service. + * + * @return Times we run the periodic service. + */ + protected long getRunCount() { + return this.runCount; + } + + /** + * Get the last time the periodic service was executed. + * + * @return Last time the periodic service was executed. + */ + protected long getLastUpdate() { + return this.lastRun; + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + LOG.info("Starting periodic service {}", this.serviceName); + startPeriodic(); + } + + @Override + protected void serviceStop() throws Exception { + stopPeriodic(); + LOG.info("Stopping periodic service {}", this.serviceName); + super.serviceStop(); + } + + /** + * Stop the periodic task. + */ + protected synchronized void stopPeriodic() { + if (this.isRunning) { + LOG.info("{} is shutting down", this.serviceName); + this.isRunning = false; + this.scheduler.shutdownNow(); + } + } + + /** + * Start the periodic execution. + */ + protected synchronized void startPeriodic() { + stopPeriodic(); + + // Create the runnable service + Runnable updateRunnable = new Runnable() { + @Override + public void run() { + LOG.debug("Running {} update task", serviceName); + try { + if (!isRunning) { + return; + } + periodicInvoke(); + runCount++; + lastRun = Time.now(); + } catch (Exception ex) { + errorCount++; + LOG.warn(serviceName + " service threw an exception", ex); + } + } + }; + + // Start the execution of the periodic service + this.isRunning = true; + this.scheduler.scheduleWithFixedDelay( + updateRunnable, 0, this.intervalMs, TimeUnit.MILLISECONDS); + } + + /** + * Method that the service will run periodically. + */ + protected abstract void periodicInvoke(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java new file mode 100644 index 0000000..dbb6ffa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Module that implements the quota relevant RPC calls + * {@link ClientProtocol#setQuota(String, long, long, StorageType)} + * and + * {@link ClientProtocol#getQuotaUsage(String)} + * in the {@link RouterRpcServer}. + */ +public class Quota { + private static final Logger LOG = LoggerFactory.getLogger(Quota.class); + + /** RPC server to receive client calls. */ + private final RouterRpcServer rpcServer; + /** RPC clients to connect to the Namenodes. */ + private final RouterRpcClient rpcClient; + /** Router used in RouterRpcServer. */ + private final Router router; + + public Quota(Router router, RouterRpcServer server) { + this.router = router; + this.rpcServer = server; + this.rpcClient = server.getRPCClient(); + } + + /** + * Set quota for the federation path. + * @param path Federation path. + * @param namespaceQuota Name space quota. + * @param storagespaceQuota Storage space quota. + * @param type StorageType that the space quota is intended to be set on. + * @throws IOException + */ + public void setQuota(String path, long namespaceQuota, + long storagespaceQuota, StorageType type) throws IOException { + rpcServer.checkOperation(OperationCategory.WRITE); + + // Set quota for current path and its children mount table path. + final List<RemoteLocation> locations = getQuotaRemoteLocations(path); + if (LOG.isDebugEnabled()) { + for (RemoteLocation loc : locations) { + LOG.debug("Set quota for path: nsId: {}, dest: {}.", + loc.getNameserviceId(), loc.getDest()); + } + } + + RemoteMethod method = new RemoteMethod("setQuota", + new Class<?>[] {String.class, long.class, long.class, + StorageType.class}, + new RemoteParam(), namespaceQuota, storagespaceQuota, type); + rpcClient.invokeConcurrent(locations, method, false, false); + } + + /** + * Get quota usage for the federation path. + * @param path Federation path. + * @return Aggregated quota. + * @throws IOException + */ + public QuotaUsage getQuotaUsage(String path) throws IOException { + final List<RemoteLocation> quotaLocs = getValidQuotaLocations(path); + RemoteMethod method = new RemoteMethod("getQuotaUsage", + new Class<?>[] {String.class}, new RemoteParam()); + Map<RemoteLocation, QuotaUsage> results = rpcClient.invokeConcurrent( + quotaLocs, method, true, false, QuotaUsage.class); + + return aggregateQuota(results); + } + + /** + * Get valid quota remote locations used in {@link #getQuotaUsage(String)}. + * Differentiate the method {@link #getQuotaRemoteLocations(String)}, this + * method will do some additional filtering. + * @param path Federation path. + * @return List of valid quota remote locations. + * @throws IOException + */ + private List<RemoteLocation> getValidQuotaLocations(String path) + throws IOException { + final List<RemoteLocation> locations = getQuotaRemoteLocations(path); + + // NameService -> Locations + Map<String, List<RemoteLocation>> validLocations = new HashMap<>(); + for (RemoteLocation loc : locations) { + String nsId = loc.getNameserviceId(); + List<RemoteLocation> dests = validLocations.get(nsId); + if (dests == null) { + dests = new LinkedList<>(); + dests.add(loc); + validLocations.put(nsId, dests); + } else { + // Ensure the paths in the same nameservice is different. + // Don't include parent-child paths. + boolean isChildPath = false; + for (RemoteLocation d : dests) { + if (loc.getDest().startsWith(d.getDest())) { + isChildPath = true; + break; + } + } + + if (!isChildPath) { + dests.add(loc); + } + } + } + + List<RemoteLocation> quotaLocs = new LinkedList<>(); + for (List<RemoteLocation> locs : validLocations.values()) { + quotaLocs.addAll(locs); + } + + return quotaLocs; + } + + /** + * Aggregate quota that queried from sub-clusters. + * @param results Quota query result. + * @return Aggregated Quota. + */ + private QuotaUsage aggregateQuota(Map<RemoteLocation, QuotaUsage> results) { + long nsCount = 0; + long ssCount = 0; + boolean hasQuotaUnSet = false; + + for (Map.Entry<RemoteLocation, QuotaUsage> entry : results.entrySet()) { + RemoteLocation loc = entry.getKey(); + QuotaUsage usage = entry.getValue(); + if (usage != null) { + // If quota is not set in real FileSystem, the usage + // value will return -1. + if (usage.getQuota() == -1 && usage.getSpaceQuota() == -1) { + hasQuotaUnSet = true; + } + + nsCount += usage.getFileAndDirectoryCount(); + ssCount += usage.getSpaceConsumed(); + LOG.debug( + "Get quota usage for path: nsId: {}, dest: {}," + + " nsCount: {}, ssCount: {}.", + loc.getNameserviceId(), loc.getDest(), + usage.getFileAndDirectoryCount(), usage.getSpaceConsumed()); + } + } + + QuotaUsage.Builder builder = new QuotaUsage.Builder() + .fileAndDirectoryCount(nsCount).spaceConsumed(ssCount); + if (hasQuotaUnSet) { + builder.quota(HdfsConstants.QUOTA_DONT_SET); + } + + return builder.build(); + } + + /** + * Get all quota remote locations across subclusters under given + * federation path. + * @param path Federation path. + * @return List of quota remote locations. + * @throws IOException + */ + private List<RemoteLocation> getQuotaRemoteLocations(String path) + throws IOException { + List<RemoteLocation> locations = new LinkedList<>(); + RouterQuotaManager manager = this.router.getQuotaManager(); + if (manager != null) { + Set<String> childrenPaths = manager.getPaths(path); + for (String childPath : childrenPaths) { + locations.addAll(rpcServer.getLocationsForPath(childPath, true)); + } + } + + return locations; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java new file mode 100644 index 0000000..170b876 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl; +import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl; + +import java.util.concurrent.TimeUnit; + +/** + * Config fields for router-based hdfs federation. + */ +@InterfaceAudience.Private +public class RBFConfigKeys extends CommonConfigurationKeysPublic { + + // HDFS Router-based federation + public static final String FEDERATION_ROUTER_PREFIX = + "dfs.federation.router."; + public static final String DFS_ROUTER_DEFAULT_NAMESERVICE = + FEDERATION_ROUTER_PREFIX + "default.nameserviceId"; + public static final String DFS_ROUTER_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "handler.count"; + public static final int DFS_ROUTER_HANDLER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_READER_QUEUE_SIZE_KEY = + FEDERATION_ROUTER_PREFIX + "reader.queue.size"; + public static final int DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT = 100; + public static final String DFS_ROUTER_READER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "reader.count"; + public static final int DFS_ROUTER_READER_COUNT_DEFAULT = 1; + public static final String DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY = + FEDERATION_ROUTER_PREFIX + "handler.queue.size"; + public static final int DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT = 100; + public static final String DFS_ROUTER_RPC_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "rpc-bind-host"; + public static final int DFS_ROUTER_RPC_PORT_DEFAULT = 8888; + public static final String DFS_ROUTER_RPC_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "rpc-address"; + public static final String DFS_ROUTER_RPC_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_RPC_PORT_DEFAULT; + public static final String DFS_ROUTER_RPC_ENABLE = + FEDERATION_ROUTER_PREFIX + "rpc.enable"; + public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true; + + public static final String DFS_ROUTER_METRICS_ENABLE = + FEDERATION_ROUTER_PREFIX + "metrics.enable"; + public static final boolean DFS_ROUTER_METRICS_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_METRICS_CLASS = + FEDERATION_ROUTER_PREFIX + "metrics.class"; + public static final Class<? extends RouterRpcMonitor> + DFS_ROUTER_METRICS_CLASS_DEFAULT = + FederationRPCPerformanceMonitor.class; + + // HDFS Router heartbeat + public static final String DFS_ROUTER_HEARTBEAT_ENABLE = + FEDERATION_ROUTER_PREFIX + "heartbeat.enable"; + public static final boolean DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_HEARTBEAT_INTERVAL_MS = + FEDERATION_ROUTER_PREFIX + "heartbeat.interval"; + public static final long DFS_ROUTER_HEARTBEAT_INTERVAL_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(5); + public static final String DFS_ROUTER_MONITOR_NAMENODE = + FEDERATION_ROUTER_PREFIX + "monitor.namenode"; + public static final String DFS_ROUTER_MONITOR_LOCAL_NAMENODE = + FEDERATION_ROUTER_PREFIX + "monitor.localnamenode.enable"; + public static final boolean DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT = true; + public static final String DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS = + FEDERATION_ROUTER_PREFIX + "heartbeat-state.interval"; + public static final long DFS_ROUTER_HEARTBEAT_STATE_INTERVAL_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(5); + + // HDFS Router NN client + public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE = + FEDERATION_ROUTER_PREFIX + "connection.pool-size"; + public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT = + 64; + public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN = + FEDERATION_ROUTER_PREFIX + "connection.pool.clean.ms"; + public static final long DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + public static final String DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS = + FEDERATION_ROUTER_PREFIX + "connection.clean.ms"; + public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT = + TimeUnit.SECONDS.toMillis(10); + + // HDFS Router RPC client + public static final String DFS_ROUTER_CLIENT_THREADS_SIZE = + FEDERATION_ROUTER_PREFIX + "client.thread-size"; + public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32; + public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS = + FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts"; + public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3; + + // HDFS Router State Store connection + public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = + FEDERATION_ROUTER_PREFIX + "file.resolver.client.class"; + public static final Class<? extends FileSubclusterResolver> + FEDERATION_FILE_RESOLVER_CLIENT_CLASS_DEFAULT = + MountTableResolver.class; + public static final String FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS = + FEDERATION_ROUTER_PREFIX + "namenode.resolver.client.class"; + public static final Class<? extends ActiveNamenodeResolver> + FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS_DEFAULT = + MembershipNamenodeResolver.class; + + // HDFS Router-based federation State Store + public static final String FEDERATION_STORE_PREFIX = + FEDERATION_ROUTER_PREFIX + "store."; + + public static final String DFS_ROUTER_STORE_ENABLE = + FEDERATION_STORE_PREFIX + "enable"; + public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; + + public static final String FEDERATION_STORE_SERIALIZER_CLASS = + FEDERATION_STORE_PREFIX + "serializer"; + public static final Class<StateStoreSerializerPBImpl> + FEDERATION_STORE_SERIALIZER_CLASS_DEFAULT = + StateStoreSerializerPBImpl.class; + + public static final String FEDERATION_STORE_DRIVER_CLASS = + FEDERATION_STORE_PREFIX + "driver.class"; + public static final Class<? extends StateStoreDriver> + FEDERATION_STORE_DRIVER_CLASS_DEFAULT = StateStoreZooKeeperImpl.class; + + public static final String FEDERATION_STORE_CONNECTION_TEST_MS = + FEDERATION_STORE_PREFIX + "connection.test"; + public static final long FEDERATION_STORE_CONNECTION_TEST_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + + public static final String DFS_ROUTER_CACHE_TIME_TO_LIVE_MS = + FEDERATION_ROUTER_PREFIX + "cache.ttl"; + public static final long DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(1); + + public static final String FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS = + FEDERATION_STORE_PREFIX + "membership.expiration"; + public static final long FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(5); + public static final String FEDERATION_STORE_ROUTER_EXPIRATION_MS = + FEDERATION_STORE_PREFIX + "router.expiration"; + public static final long FEDERATION_STORE_ROUTER_EXPIRATION_MS_DEFAULT = + TimeUnit.MINUTES.toMillis(5); + + // HDFS Router safe mode + public static final String DFS_ROUTER_SAFEMODE_ENABLE = + FEDERATION_ROUTER_PREFIX + "safemode.enable"; + public static final boolean DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_SAFEMODE_EXTENSION = + FEDERATION_ROUTER_PREFIX + "safemode.extension"; + public static final long DFS_ROUTER_SAFEMODE_EXTENSION_DEFAULT = + TimeUnit.SECONDS.toMillis(30); + public static final String DFS_ROUTER_SAFEMODE_EXPIRATION = + FEDERATION_ROUTER_PREFIX + "safemode.expiration"; + public static final long DFS_ROUTER_SAFEMODE_EXPIRATION_DEFAULT = + 3 * DFS_ROUTER_CACHE_TIME_TO_LIVE_MS_DEFAULT; + + // HDFS Router-based federation mount table entries + /** Maximum number of cache entries to have. */ + public static final String FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE = + FEDERATION_ROUTER_PREFIX + "mount-table.max-cache-size"; + /** Remove cache entries if we have more than 10k. */ + public static final int FEDERATION_MOUNT_TABLE_MAX_CACHE_SIZE_DEFAULT = 10000; + + // HDFS Router-based federation admin + public static final String DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY = + FEDERATION_ROUTER_PREFIX + "admin.handler.count"; + public static final int DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT = 1; + public static final int DFS_ROUTER_ADMIN_PORT_DEFAULT = 8111; + public static final String DFS_ROUTER_ADMIN_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "admin-address"; + public static final String DFS_ROUTER_ADMIN_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_ADMIN_PORT_DEFAULT; + public static final String DFS_ROUTER_ADMIN_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "admin-bind-host"; + public static final String DFS_ROUTER_ADMIN_ENABLE = + FEDERATION_ROUTER_PREFIX + "admin.enable"; + public static final boolean DFS_ROUTER_ADMIN_ENABLE_DEFAULT = true; + + // HDFS Router-based federation web + public static final String DFS_ROUTER_HTTP_ENABLE = + FEDERATION_ROUTER_PREFIX + "http.enable"; + public static final boolean DFS_ROUTER_HTTP_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_HTTP_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "http-address"; + public static final int DFS_ROUTER_HTTP_PORT_DEFAULT = 50071; + public static final String DFS_ROUTER_HTTP_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "http-bind-host"; + public static final String DFS_ROUTER_HTTP_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_HTTP_PORT_DEFAULT; + public static final String DFS_ROUTER_HTTPS_ADDRESS_KEY = + FEDERATION_ROUTER_PREFIX + "https-address"; + public static final int DFS_ROUTER_HTTPS_PORT_DEFAULT = 50072; + public static final String DFS_ROUTER_HTTPS_BIND_HOST_KEY = + FEDERATION_ROUTER_PREFIX + "https-bind-host"; + public static final String DFS_ROUTER_HTTPS_ADDRESS_DEFAULT = + "0.0.0.0:" + DFS_ROUTER_HTTPS_PORT_DEFAULT; + + // HDFS Router-based federation quota + public static final String DFS_ROUTER_QUOTA_ENABLE = + FEDERATION_ROUTER_PREFIX + "quota.enable"; + public static final boolean DFS_ROUTER_QUOTA_ENABLED_DEFAULT = false; + public static final String DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL = + FEDERATION_ROUTER_PREFIX + "quota-cache.update.interval"; + public static final long DFS_ROUTER_QUOTA_CACHE_UPATE_INTERVAL_DEFAULT = + 60000; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java new file mode 100644 index 0000000..a90c460 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteLocationContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import org.apache.commons.lang.builder.HashCodeBuilder; + +/** + * Base class for objects that are unique to a namespace. + */ +public abstract class RemoteLocationContext + implements Comparable<RemoteLocationContext> { + + /** + * Returns an identifier for a unique namespace. + * + * @return Namespace identifier. + */ + public abstract String getNameserviceId(); + + /** + * Destination in this location. For example the path in a remote namespace. + * + * @return Destination in this location. + */ + public abstract String getDest(); + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 31) + .append(getNameserviceId()) + .append(getDest()) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof RemoteLocationContext) { + RemoteLocationContext other = (RemoteLocationContext) obj; + return this.getNameserviceId().equals(other.getNameserviceId()) && + this.getDest().equals(other.getDest()); + } + return false; + } + + @Override + public int compareTo(RemoteLocationContext info) { + int ret = this.getNameserviceId().compareTo(info.getNameserviceId()); + if (ret == 0) { + ret = this.getDest().compareTo(info.getDest()); + } + return ret; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java new file mode 100644 index 0000000..cd57d45 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; + +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Determines the remote client protocol method and the parameter list for a + * specific location. + */ +public class RemoteMethod { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteMethod.class); + + + /** List of parameters: static and dynamic values, matchings types. */ + private final Object[] params; + /** List of method parameters types, matches parameters. */ + private final Class<?>[] types; + /** String name of the ClientProtocol method. */ + private final String methodName; + + /** + * Create a method with no parameters. + * + * @param method The string name of the ClientProtocol method. + */ + public RemoteMethod(String method) { + this.params = null; + this.types = null; + this.methodName = method; + } + + /** + * Creates a remote method generator. + * + * @param method The string name of the ClientProtocol method. + * @param pTypes A list of types to use to locate the specific method. + * @param pParams A list of parameters for the method. The order of the + * parameter list must match the order and number of the types. + * Parameters are grouped into 2 categories: + * <ul> + * <li>Static parameters that are immutable across locations. + * <li>Dynamic parameters that are determined for each location by a + * RemoteParam object. To specify a dynamic parameter, pass an + * instance of RemoteParam in place of the parameter value. + * </ul> + * @throws IOException If the types and parameter lists are not valid. + */ + public RemoteMethod(String method, Class<?>[] pTypes, Object... pParams) + throws IOException { + + if (pParams.length != pTypes.length) { + throw new IOException("Invalid parameters for method " + method); + } + + this.params = pParams; + this.types = pTypes; + this.methodName = method; + } + + /** + * Get the represented java method. + * + * @return Method + * @throws IOException If the method cannot be found. + */ + public Method getMethod() throws IOException { + try { + if (types != null) { + return ClientProtocol.class.getDeclaredMethod(methodName, types); + } else { + return ClientProtocol.class.getDeclaredMethod(methodName); + } + } catch (NoSuchMethodException e) { + // Re-throw as an IOException + LOG.error("Cannot get method {} with types {}", + methodName, Arrays.toString(types), e); + throw new IOException(e); + } catch (SecurityException e) { + LOG.error("Cannot access method {} with types {}", + methodName, Arrays.toString(types), e); + throw new IOException(e); + } + } + + /** + * Get the calling types for this method. + * + * @return An array of calling types. + */ + public Class<?>[] getTypes() { + return this.types; + } + + /** + * Generate a list of parameters for this specific location using no context. + * + * @return A list of parameters for the method customized for the location. + */ + public Object[] getParams() { + return this.getParams(null); + } + + /** + * Get the name of the method. + * + * @return Name of the method. + */ + public String getMethodName() { + return this.methodName; + } + + /** + * Generate a list of parameters for this specific location. Parameters are + * grouped into 2 categories: + * <ul> + * <li>Static parameters that are immutable across locations. + * <li>Dynamic parameters that are determined for each location by a + * RemoteParam object. + * </ul> + * + * @param context The context identifying the location. + * @return A list of parameters for the method customized for the location. + */ + public Object[] getParams(RemoteLocationContext context) { + if (this.params == null) { + return new Object[] {}; + } + Object[] objList = new Object[this.params.length]; + for (int i = 0; i < this.params.length; i++) { + Object currentObj = this.params[i]; + if (currentObj instanceof RemoteParam) { + // Map the parameter using the context + RemoteParam paramGetter = (RemoteParam) currentObj; + objList[i] = paramGetter.getParameterForContext(context); + } else { + objList[i] = currentObj; + } + } + return objList; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java new file mode 100644 index 0000000..8816ff6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.Map; + +/** + * A dynamically assignable parameter that is location-specific. + * <p> + * There are 2 ways this mapping is determined: + * <ul> + * <li>Default: Uses the RemoteLocationContext's destination + * <li>Map: Uses the value of the RemoteLocationContext key provided in the + * parameter map. + * </ul> + */ +public class RemoteParam { + + private final Map<? extends Object, ? extends Object> paramMap; + + /** + * Constructs a default remote parameter. Always maps the value to the + * destination of the provided RemoveLocationContext. + */ + public RemoteParam() { + this.paramMap = null; + } + + /** + * Constructs a map based remote parameter. Determines the value using the + * provided RemoteLocationContext as a key into the map. + * + * @param map Map with RemoteLocationContext keys. + */ + public RemoteParam( + Map<? extends RemoteLocationContext, ? extends Object> map) { + this.paramMap = map; + } + + /** + * Determine the appropriate value for this parameter based on the location. + * + * @param context Context identifying the location. + * @return A parameter specific to this location. + */ + public Object getParameterForContext(RemoteLocationContext context) { + if (context == null) { + return null; + } else if (this.paramMap != null) { + return this.paramMap.get(context); + } else { + // Default case + return context.getDest(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java new file mode 100644 index 0000000..38f5d4f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -0,0 +1,655 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newActiveNamenodeResolver; +import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.newFileSubclusterResolver; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Router that provides a unified view of multiple federated HDFS clusters. It + * has two main roles: (1) federated interface and (2) NameNode heartbeat. + * <p> + * For the federated interface, the Router receives a client request, checks the + * State Store for the correct subcluster, and forwards the request to the + * active Namenode of that subcluster. The reply from the Namenode then flows in + * the opposite direction. The Routers are stateless and can be behind a load + * balancer. HDFS clients connect to the router using the same interfaces as are + * used to communicate with a namenode, namely the ClientProtocol RPC interface + * and the WebHdfs HTTP interface exposed by the router. {@link RouterRpcServer} + * {@link RouterHttpServer} + * <p> + * For NameNode heartbeat, the Router periodically checks the state of a + * NameNode (usually on the same server) and reports their high availability + * (HA) state and load/space status to the State Store. Note that this is an + * optional role as a Router can be independent of any subcluster. + * {@link StateStoreService} {@link NamenodeHeartbeatService} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class Router extends CompositeService { + + private static final Logger LOG = LoggerFactory.getLogger(Router.class); + + + /** Configuration for the Router. */ + private Configuration conf; + + /** Router address/identifier. */ + private String routerId; + + /** RPC interface to the client. */ + private RouterRpcServer rpcServer; + private InetSocketAddress rpcAddress; + + /** RPC interface for the admin. */ + private RouterAdminServer adminServer; + private InetSocketAddress adminAddress; + + /** HTTP interface and web application. */ + private RouterHttpServer httpServer; + + /** Interface with the State Store. */ + private StateStoreService stateStore; + + /** Interface to map global name space to HDFS subcluster name spaces. */ + private FileSubclusterResolver subclusterResolver; + + /** Interface to identify the active NN for a nameservice or blockpool ID. */ + private ActiveNamenodeResolver namenodeResolver; + /** Updates the namenode status in the namenode resolver. */ + private Collection<NamenodeHeartbeatService> namenodeHeartbeatServices; + + /** Router metrics. */ + private RouterMetricsService metrics; + + /** JVM pauses (GC and others). */ + private JvmPauseMonitor pauseMonitor; + + /** Quota usage update service. */ + private RouterQuotaUpdateService quotaUpdateService; + /** Quota cache manager. */ + private RouterQuotaManager quotaManager; + + /** Manages the current state of the router. */ + private RouterStore routerStateManager; + /** Heartbeat our run status to the router state manager. */ + private RouterHeartbeatService routerHeartbeatService; + /** Enter/exit safemode. */ + private RouterSafemodeService safemodeService; + + /** The start time of the namesystem. */ + private final long startTime = Time.now(); + + /** State of the Router. */ + private RouterServiceState state = RouterServiceState.UNINITIALIZED; + + + ///////////////////////////////////////////////////////// + // Constructor + ///////////////////////////////////////////////////////// + + public Router() { + super(Router.class.getName()); + } + + ///////////////////////////////////////////////////////// + // Service management + ///////////////////////////////////////////////////////// + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.conf = configuration; + updateRouterState(RouterServiceState.INITIALIZING); + + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, + RBFConfigKeys.DFS_ROUTER_STORE_ENABLE_DEFAULT)) { + // Service that maintains the State Store connection + this.stateStore = new StateStoreService(); + addService(this.stateStore); + } + + // Resolver to track active NNs + this.namenodeResolver = newActiveNamenodeResolver( + this.conf, this.stateStore); + if (this.namenodeResolver == null) { + throw new IOException("Cannot find namenode resolver."); + } + + // Lookup interface to map between the global and subcluster name spaces + this.subclusterResolver = newFileSubclusterResolver(this.conf, this); + if (this.subclusterResolver == null) { + throw new IOException("Cannot find subcluster resolver"); + } + + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_RPC_ENABLE, + RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_DEFAULT)) { + // Create RPC server + this.rpcServer = createRpcServer(); + addService(this.rpcServer); + this.setRpcServerAddress(rpcServer.getRpcAddress()); + } + + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE, + RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE_DEFAULT)) { + // Create admin server + this.adminServer = createAdminServer(); + addService(this.adminServer); + } + + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE, + RBFConfigKeys.DFS_ROUTER_HTTP_ENABLE_DEFAULT)) { + // Create HTTP server + this.httpServer = createHttpServer(); + addService(this.httpServer); + } + + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE, + RBFConfigKeys.DFS_ROUTER_HEARTBEAT_ENABLE_DEFAULT)) { + + // Create status updater for each monitored Namenode + this.namenodeHeartbeatServices = createNamenodeHeartbeatServices(); + for (NamenodeHeartbeatService hearbeatService : + this.namenodeHeartbeatServices) { + addService(hearbeatService); + } + + if (this.namenodeHeartbeatServices.isEmpty()) { + LOG.error("Heartbeat is enabled but there are no namenodes to monitor"); + } + + // Periodically update the router state + this.routerHeartbeatService = new RouterHeartbeatService(this); + addService(this.routerHeartbeatService); + } + + // Router metrics system + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, + RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { + + DefaultMetricsSystem.initialize("Router"); + + this.metrics = new RouterMetricsService(this); + addService(this.metrics); + + // JVM pause monitor + this.pauseMonitor = new JvmPauseMonitor(); + this.pauseMonitor.init(conf); + } + + // Initial quota relevant service + if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_QUOTA_ENABLE, + RBFConfigKeys.DFS_ROUTER_QUOTA_ENABLED_DEFAULT)) { + this.quotaManager = new RouterQuotaManager(); + this.quotaUpdateService = new RouterQuotaUpdateService(this); + addService(this.quotaUpdateService); + } + + // Safemode service to refuse RPC calls when the router is out of sync + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE, + RBFConfigKeys.DFS_ROUTER_SAFEMODE_ENABLE_DEFAULT)) { + // Create safemode monitoring service + this.safemodeService = new RouterSafemodeService(this); + addService(this.safemodeService); + } + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + + if (this.safemodeService == null) { + // Router is running now + updateRouterState(RouterServiceState.RUNNING); + } + + if (this.pauseMonitor != null) { + this.pauseMonitor.start(); + JvmMetrics jvmMetrics = this.metrics.getJvmMetrics(); + if (jvmMetrics != null) { + jvmMetrics.setPauseMonitor(pauseMonitor); + } + } + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + + // Update state + updateRouterState(RouterServiceState.SHUTDOWN); + + // JVM pause monitor + if (this.pauseMonitor != null) { + this.pauseMonitor.stop(); + } + + super.serviceStop(); + } + + /** + * Shutdown the router. + */ + public void shutDown() { + new Thread() { + @Override + public void run() { + Router.this.stop(); + } + }.start(); + } + + ///////////////////////////////////////////////////////// + // RPC Server + ///////////////////////////////////////////////////////// + + /** + * Create a new Router RPC server to proxy ClientProtocol requests. + * + * @return New Router RPC Server. + * @throws IOException If the router RPC server was not started. + */ + protected RouterRpcServer createRpcServer() throws IOException { + return new RouterRpcServer(this.conf, this, this.getNamenodeResolver(), + this.getSubclusterResolver()); + } + + /** + * Get the Router RPC server. + * + * @return Router RPC server. + */ + public RouterRpcServer getRpcServer() { + return this.rpcServer; + } + + /** + * Set the current RPC socket for the router. + * + * @param address RPC address. + */ + protected void setRpcServerAddress(InetSocketAddress address) { + this.rpcAddress = address; + + // Use the RPC address as our unique router Id + if (this.rpcAddress != null) { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + setRouterId(hostname + ":" + this.rpcAddress.getPort()); + } catch (UnknownHostException ex) { + LOG.error("Cannot set unique router ID, address not resolvable {}", + this.rpcAddress); + } + } + } + + /** + * Get the current RPC socket address for the router. + * + * @return InetSocketAddress + */ + public InetSocketAddress getRpcServerAddress() { + return this.rpcAddress; + } + + ///////////////////////////////////////////////////////// + // Admin server + ///////////////////////////////////////////////////////// + + /** + * Create a new router admin server to handle the router admin interface. + * + * @return RouterAdminServer + * @throws IOException If the admin server was not successfully started. + */ + protected RouterAdminServer createAdminServer() throws IOException { + return new RouterAdminServer(this.conf, this); + } + + /** + * Set the current Admin socket for the router. + * + * @param address Admin RPC address. + */ + protected void setAdminServerAddress(InetSocketAddress address) { + this.adminAddress = address; + } + + /** + * Get the current Admin socket address for the router. + * + * @return InetSocketAddress Admin address. + */ + public InetSocketAddress getAdminServerAddress() { + return adminAddress; + } + + ///////////////////////////////////////////////////////// + // HTTP server + ///////////////////////////////////////////////////////// + + /** + * Create an HTTP server for this Router. + * + * @return HTTP server for this Router. + */ + protected RouterHttpServer createHttpServer() { + return new RouterHttpServer(this); + } + + /** + * Get the current HTTP socket address for the router. + * + * @return InetSocketAddress HTTP address. + */ + public InetSocketAddress getHttpServerAddress() { + if (httpServer != null) { + return httpServer.getHttpAddress(); + } + return null; + } + + ///////////////////////////////////////////////////////// + // Namenode heartbeat monitors + ///////////////////////////////////////////////////////// + + /** + * Create each of the services that will monitor a Namenode. + * + * @return List of heartbeat services. + */ + protected Collection<NamenodeHeartbeatService> + createNamenodeHeartbeatServices() { + + Map<String, NamenodeHeartbeatService> ret = new HashMap<>(); + + if (conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE, + RBFConfigKeys.DFS_ROUTER_MONITOR_LOCAL_NAMENODE_DEFAULT)) { + // Create a local heartbet service + NamenodeHeartbeatService localHeartbeatService = + createLocalNamenodeHearbeatService(); + if (localHeartbeatService != null) { + String nnDesc = localHeartbeatService.getNamenodeDesc(); + ret.put(nnDesc, localHeartbeatService); + } + } + + // Create heartbeat services for a list specified by the admin + String namenodes = this.conf.get( + RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE); + if (namenodes != null) { + for (String namenode : namenodes.split(",")) { + String[] namenodeSplit = namenode.split("\\."); + String nsId = null; + String nnId = null; + if (namenodeSplit.length == 2) { + nsId = namenodeSplit[0]; + nnId = namenodeSplit[1]; + } else if (namenodeSplit.length == 1) { + nsId = namenode; + } else { + LOG.error("Wrong Namenode to monitor: {}", namenode); + } + if (nsId != null) { + NamenodeHeartbeatService heartbeatService = + createNamenodeHearbeatService(nsId, nnId); + if (heartbeatService != null) { + ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); + } + } + } + } + + return ret.values(); + } + + /** + * Create a new status updater for the local Namenode. + * + * @return Updater of the status for the local Namenode. + */ + protected NamenodeHeartbeatService createLocalNamenodeHearbeatService() { + // Detect NN running in this machine + String nsId = DFSUtil.getNamenodeNameServiceId(conf); + String nnId = null; + if (HAUtil.isHAEnabled(conf, nsId)) { + nnId = HAUtil.getNameNodeId(conf, nsId); + if (nnId == null) { + LOG.error("Cannot find namenode id for local {}", nsId); + } + } + + return createNamenodeHearbeatService(nsId, nnId); + } + + /** + * Create a heartbeat monitor for a particular Namenode. + * + * @param nsId Identifier of the nameservice to monitor. + * @param nnId Identifier of the namenode (HA) to monitor. + * @return Updater of the status for the specified Namenode. + */ + protected NamenodeHeartbeatService createNamenodeHearbeatService( + String nsId, String nnId) { + + LOG.info("Creating heartbeat service for Namenode {} in {}", nnId, nsId); + NamenodeHeartbeatService ret = new NamenodeHeartbeatService( + namenodeResolver, nsId, nnId); + return ret; + } + + ///////////////////////////////////////////////////////// + // Router State Management + ///////////////////////////////////////////////////////// + + /** + * Update the router state and heartbeat to the state store. + * + * @param state The new router state. + */ + public void updateRouterState(RouterServiceState newState) { + this.state = newState; + if (this.routerHeartbeatService != null) { + this.routerHeartbeatService.updateStateAsync(); + } + } + + /** + * Get the status of the router. + * + * @return Status of the router. + */ + public RouterServiceState getRouterState() { + return this.state; + } + + ///////////////////////////////////////////////////////// + // Submodule getters + ///////////////////////////////////////////////////////// + + /** + * Get the State Store service. + * + * @return State Store service. + */ + public StateStoreService getStateStore() { + return this.stateStore; + } + + /** + * Get the metrics system for the Router. + * + * @return Router metrics. + */ + public RouterMetrics getRouterMetrics() { + if (this.metrics != null) { + return this.metrics.getRouterMetrics(); + } + return null; + } + + /** + * Get the federation metrics. + * + * @return Federation metrics. + */ + public FederationMetrics getMetrics() { + if (this.metrics != null) { + return this.metrics.getFederationMetrics(); + } + return null; + } + + /** + * Get the subcluster resolver for files. + * + * @return Subcluster resolver for files. + */ + public FileSubclusterResolver getSubclusterResolver() { + return this.subclusterResolver; + } + + /** + * Get the namenode resolver for a subcluster. + * + * @return The namenode resolver for a subcluster. + */ + public ActiveNamenodeResolver getNamenodeResolver() { + return this.namenodeResolver; + } + + /** + * Get the state store interface for the router heartbeats. + * + * @return FederationRouterStateStore state store API handle. + */ + public RouterStore getRouterStateManager() { + if (this.routerStateManager == null && this.stateStore != null) { + this.routerStateManager = this.stateStore.getRegisteredRecordStore( + RouterStore.class); + } + return this.routerStateManager; + } + + ///////////////////////////////////////////////////////// + // Router info + ///////////////////////////////////////////////////////// + + /** + * Get the start date of the Router. + * + * @return Start date of the router. + */ + public long getStartTime() { + return this.startTime; + } + + /** + * Unique ID for the router, typically the hostname:port string for the + * router's RPC server. This ID may be null on router startup before the RPC + * server has bound to a port. + * + * @return Router identifier. + */ + public String getRouterId() { + return this.routerId; + } + + /** + * Sets a unique ID for this router. + * + * @param id Identifier of the Router. + */ + public void setRouterId(String id) { + this.routerId = id; + if (this.stateStore != null) { + this.stateStore.setIdentifier(this.routerId); + } + if (this.namenodeResolver != null) { + this.namenodeResolver.setRouterId(this.routerId); + } + } + + /** + * If the quota system is enabled in Router. + */ + public boolean isQuotaEnabled() { + return this.quotaManager != null; + } + + /** + * Get route quota manager. + * @return RouterQuotaManager Quota manager. + */ + public RouterQuotaManager getQuotaManager() { + return this.quotaManager; + } + + /** + * Get quota cache update service. + */ + @VisibleForTesting + RouterQuotaUpdateService getQuotaCacheUpdateService() { + return this.quotaUpdateService; + } + + /** + * Get the list of namenode heartbeat service. + */ + @VisibleForTesting + Collection<NamenodeHeartbeatService> getNamenodeHearbeatServices() { + return this.namenodeHeartbeatServices; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/87700d45/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java new file mode 100644 index 0000000..2c195c6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.RouterProtocolProtos.RouterAdminProtocolService; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.RouterAdminProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.EnterSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.LeaveSafeModeResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; + +/** + * This class is responsible for handling all of the Admin calls to the HDFS + * router. It is created, started, and stopped by {@link Router}. + */ +public class RouterAdminServer extends AbstractService + implements MountTableManager, RouterStateManager { + + private static final Logger LOG = + LoggerFactory.getLogger(RouterAdminServer.class); + + private Configuration conf; + + private final Router router; + + private MountTableStore mountTableStore; + + /** The Admin server that listens to requests from clients. */ + private final Server adminServer; + private final InetSocketAddress adminAddress; + + /** + * Permission related info used for constructing new router permission + * checker instance. + */ + private static String routerOwner; + private static String superGroup; + private static boolean isPermissionEnabled; + + public RouterAdminServer(Configuration conf, Router router) + throws IOException { + super(RouterAdminServer.class.getName()); + + this.conf = conf; + this.router = router; + + int handlerCount = this.conf.getInt( + RBFConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_HANDLER_COUNT_DEFAULT); + + RPC.setProtocolEngine(this.conf, RouterAdminProtocolPB.class, + ProtobufRpcEngine.class); + + RouterAdminProtocolServerSideTranslatorPB routerAdminProtocolTranslator = + new RouterAdminProtocolServerSideTranslatorPB(this); + BlockingService clientNNPbService = RouterAdminProtocolService. + newReflectiveBlockingService(routerAdminProtocolTranslator); + + InetSocketAddress confRpcAddress = conf.getSocketAddr( + RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, + RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT, + RBFConfigKeys.DFS_ROUTER_ADMIN_PORT_DEFAULT); + + String bindHost = conf.get( + RBFConfigKeys.DFS_ROUTER_ADMIN_BIND_HOST_KEY, + confRpcAddress.getHostName()); + LOG.info("Admin server binding to {}:{}", + bindHost, confRpcAddress.getPort()); + + initializePermissionSettings(this.conf); + this.adminServer = new RPC.Builder(this.conf) + .setProtocol(RouterAdminProtocolPB.class) + .setInstance(clientNNPbService) + .setBindAddress(bindHost) + .setPort(confRpcAddress.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .build(); + + // The RPC-server port can be ephemeral... ensure we have the correct info + InetSocketAddress listenAddress = this.adminServer.getListenerAddress(); + this.adminAddress = new InetSocketAddress( + confRpcAddress.getHostName(), listenAddress.getPort()); + router.setAdminServerAddress(this.adminAddress); + } + + /** + * Initialize permission related settings. + * + * @param routerConf + * @throws IOException + */ + private static void initializePermissionSettings(Configuration routerConf) + throws IOException { + routerOwner = UserGroupInformation.getCurrentUser().getShortUserName(); + superGroup = routerConf.get( + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); + isPermissionEnabled = routerConf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY, + DFS_PERMISSIONS_ENABLED_DEFAULT); + } + + /** Allow access to the client RPC server for testing. */ + @VisibleForTesting + Server getAdminServer() { + return this.adminServer; + } + + private MountTableStore getMountTableStore() throws IOException { + if (this.mountTableStore == null) { + this.mountTableStore = router.getStateStore().getRegisteredRecordStore( + MountTableStore.class); + if (this.mountTableStore == null) { + throw new IOException("Mount table state store is not available."); + } + } + return this.mountTableStore; + } + + /** + * Get the RPC address of the admin service. + * @return Administration service RPC address. + */ + public InetSocketAddress getRpcAddress() { + return this.adminAddress; + } + + @Override + protected void serviceInit(Configuration configuration) throws Exception { + this.conf = configuration; + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.adminServer.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (this.adminServer != null) { + this.adminServer.stop(); + } + super.serviceStop(); + } + + @Override + public AddMountTableEntryResponse addMountTableEntry( + AddMountTableEntryRequest request) throws IOException { + return getMountTableStore().addMountTableEntry(request); + } + + @Override + public UpdateMountTableEntryResponse updateMountTableEntry( + UpdateMountTableEntryRequest request) throws IOException { + return getMountTableStore().updateMountTableEntry(request); + } + + @Override + public RemoveMountTableEntryResponse removeMountTableEntry( + RemoveMountTableEntryRequest request) throws IOException { + return getMountTableStore().removeMountTableEntry(request); + } + + @Override + public GetMountTableEntriesResponse getMountTableEntries( + GetMountTableEntriesRequest request) throws IOException { + return getMountTableStore().getMountTableEntries(request); + } + + @Override + public EnterSafeModeResponse enterSafeMode(EnterSafeModeRequest request) + throws IOException { + this.router.updateRouterState(RouterServiceState.SAFEMODE); + this.router.getRpcServer().setSafeMode(true); + return EnterSafeModeResponse.newInstance(verifySafeMode(true)); + } + + @Override + public LeaveSafeModeResponse leaveSafeMode(LeaveSafeModeRequest request) + throws IOException { + this.router.updateRouterState(RouterServiceState.RUNNING); + this.router.getRpcServer().setSafeMode(false); + return LeaveSafeModeResponse.newInstance(verifySafeMode(false)); + } + + @Override + public GetSafeModeResponse getSafeMode(GetSafeModeRequest request) + throws IOException { + boolean isInSafeMode = this.router.getRpcServer().isInSafeMode(); + return GetSafeModeResponse.newInstance(isInSafeMode); + } + + /** + * Verify if Router set safe mode state correctly. + * @param isInSafeMode Expected state to be set. + * @return + */ + private boolean verifySafeMode(boolean isInSafeMode) { + boolean serverInSafeMode = this.router.getRpcServer().isInSafeMode(); + RouterServiceState currentState = this.router.getRouterState(); + + return (isInSafeMode && currentState == RouterServiceState.SAFEMODE + && serverInSafeMode) + || (!isInSafeMode && currentState != RouterServiceState.SAFEMODE + && !serverInSafeMode); + } + + /** + * Get a new permission checker used for making mount table access + * control. This method will be invoked during each RPC call in router + * admin server. + * + * @return Router permission checker + * @throws AccessControlException + */ + public static RouterPermissionChecker getPermissionChecker() + throws AccessControlException { + if (!isPermissionEnabled) { + return null; + } + + try { + return new RouterPermissionChecker(routerOwner, superGroup, + NameNode.getRemoteUser()); + } catch (IOException e) { + throw new AccessControlException(e); + } + } + + /** + * Get super user name. + * + * @return String super user name. + */ + public static String getSuperUser() { + return routerOwner; + } + + /** + * Get super group name. + * + * @return String super group name. + */ + public static String getSuperGroup(){ + return superGroup; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org