http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java deleted file mode 100644 index 3eb6dc1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ /dev/null @@ -1,1260 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.protobuf.BlockingService; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.jmx.ServiceRuntimeInfoImpl; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.ozone.common.BlockGroup; -import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; -import org.apache.hadoop.ozone.common.Storage.StorageState; -import org.apache.hadoop.ozone.common.StorageInfo; -import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState; -import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; -import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB; -import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; -import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB; -import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB; -import org.apache.hadoop.ozone.scm.block.BlockManager; -import org.apache.hadoop.ozone.scm.block.BlockManagerImpl; -import org.apache.hadoop.ozone.scm.container.ContainerMapping; -import org.apache.hadoop.ozone.scm.container.Mapping; -import org.apache.hadoop.ozone.scm.container.placement.metrics.ContainerStat; -import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMMetrics; -import org.apache.hadoop.ozone.scm.exceptions.SCMException; -import org.apache.hadoop.scm.ScmInfo; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.scm.node.SCMNodeManager; -import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; -import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; -import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; -import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB; -import org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.ObjectName; -import java.io.IOException; -import java.io.PrintStream; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.hadoop.ozone.protocol.proto - .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; -import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DATANODE_ADDRESS_KEY; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; -import static org.apache.hadoop.scm.ScmConfigKeys - .OZONE_SCM_HANDLER_COUNT_DEFAULT; -import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY; -import static org.apache.hadoop.util.ExitUtil.terminate; - -/** - * StorageContainerManager is the main entry point for the service that provides - * information about which SCM nodes host containers. - * - * DataNodes report to StorageContainerManager using heartbeat - * messages. SCM allocates containers and returns a pipeline. - * - * A client once it gets a pipeline (a list of datanodes) will connect to the - * datanodes and create a container, which then can be used to store data. - */ -@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) -public class StorageContainerManager extends ServiceRuntimeInfoImpl - implements StorageContainerDatanodeProtocol, - StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean { - - private static final Logger LOG = - LoggerFactory.getLogger(StorageContainerManager.class); - - /** - * Startup options. - */ - public enum StartupOption { - INIT("-init"), - CLUSTERID("-clusterid"), - GENCLUSTERID("-genclusterid"), - REGULAR("-regular"), - HELP("-help"); - - private final String name; - private String clusterId = null; - - public void setClusterId(String cid) { - if(cid != null && !cid.isEmpty()) { - clusterId = cid; - } - } - - public String getClusterId() { - return clusterId; - } - - StartupOption(String arg) { - this.name = arg; - } - - public String getName() { - return name; - } - } - - /** - * NodeManager and container Managers for SCM. - */ - private final NodeManager scmNodeManager; - private final Mapping scmContainerManager; - private final BlockManager scmBlockManager; - private final SCMStorage scmStorage; - - /** The RPC server that listens to requests from DataNodes. */ - private final RPC.Server datanodeRpcServer; - private final InetSocketAddress datanodeRpcAddress; - - /** The RPC server that listens to requests from clients. */ - private final RPC.Server clientRpcServer; - private final InetSocketAddress clientRpcAddress; - - /** The RPC server that listens to requests from block service clients. */ - private final RPC.Server blockRpcServer; - private final InetSocketAddress blockRpcAddress; - - private final StorageContainerManagerHttpServer httpServer; - - /** SCM mxbean. */ - private ObjectName scmInfoBeanName; - - /** SCM super user. */ - private final String scmUsername; - private final Collection<String> scmAdminUsernames; - - /** SCM metrics. */ - private static SCMMetrics metrics; - /** Key = DatanodeUuid, value = ContainerStat. */ - private Cache<String, ContainerStat> containerReportCache; - - - private static final String USAGE = - "Usage: \n hdfs scm [genericOptions] " - + "[ " + StartupOption.INIT.getName() + " [ " - + StartupOption.CLUSTERID.getName() + " <cid> ] ]\n " - + "hdfs scm [genericOptions] [ " - + StartupOption.GENCLUSTERID.getName() + " ]\n " + - "hdfs scm [ " - + StartupOption.HELP.getName() + " ]\n"; - /** - * Creates a new StorageContainerManager. Configuration will be updated with - * information on the actual listening addresses used for RPC servers. - * - * @param conf configuration - */ - private StorageContainerManager(OzoneConfiguration conf) - throws IOException { - - final int handlerCount = conf.getInt( - OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); - final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, - OZONE_SCM_DB_CACHE_SIZE_DEFAULT); - - StorageContainerManager.initMetrics(); - initContainerReportCache(conf); - - scmStorage = new SCMStorage(conf); - if (scmStorage.getState() != StorageState.INITIALIZED) { - throw new SCMException("SCM not initialized.", - ResultCodes.SCM_NOT_INITIALIZED); - } - scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this); - scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); - scmBlockManager = new BlockManagerImpl(conf, scmNodeManager, - scmContainerManager, cacheSize); - - scmAdminUsernames = conf.getTrimmedStringCollection( - OzoneConfigKeys.OZONE_ADMINISTRATORS); - scmUsername = UserGroupInformation.getCurrentUser().getUserName(); - if (!scmAdminUsernames.contains(scmUsername)) { - scmAdminUsernames.add(scmUsername); - } - - RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, - ProtobufRpcEngine.class); - RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, - ProtobufRpcEngine.class); - RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, - ProtobufRpcEngine.class); - - BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos. - StorageContainerDatanodeProtocolService.newReflectiveBlockingService( - new StorageContainerDatanodeProtocolServerSideTranslatorPB(this)); - - final InetSocketAddress datanodeRpcAddr = - OzoneClientUtils.getScmDataNodeBindAddress(conf); - datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr, - StorageContainerDatanodeProtocolPB.class, dnProtoPbService, - handlerCount); - datanodeRpcAddress = OzoneClientUtils.updateRPCListenAddress(conf, - OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); - - // SCM Container Service RPC - BlockingService storageProtoPbService = - StorageContainerLocationProtocolProtos - .StorageContainerLocationProtocolService - .newReflectiveBlockingService( - new StorageContainerLocationProtocolServerSideTranslatorPB(this)); - - final InetSocketAddress scmAddress = - OzoneClientUtils.getScmClientBindAddress(conf); - clientRpcServer = startRpcServer(conf, scmAddress, - StorageContainerLocationProtocolPB.class, storageProtoPbService, - handlerCount); - clientRpcAddress = OzoneClientUtils.updateRPCListenAddress(conf, - OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer); - - // SCM Block Service RPC - BlockingService blockProtoPbService = - ScmBlockLocationProtocolProtos - .ScmBlockLocationProtocolService - .newReflectiveBlockingService( - new ScmBlockLocationProtocolServerSideTranslatorPB(this)); - - final InetSocketAddress scmBlockAddress = - OzoneClientUtils.getScmBlockClientBindAddress(conf); - blockRpcServer = startRpcServer(conf, scmBlockAddress, - ScmBlockLocationProtocolPB.class, blockProtoPbService, - handlerCount); - blockRpcAddress = OzoneClientUtils.updateRPCListenAddress(conf, - OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer); - - httpServer = new StorageContainerManagerHttpServer(conf); - - registerMXBean(); - } - - /** - * Initialize container reports cache that sent from datanodes. - * - * @param conf - */ - private void initContainerReportCache(OzoneConfiguration conf) { - containerReportCache = CacheBuilder.newBuilder() - .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS) - .maximumSize(Integer.MAX_VALUE) - .removalListener(new RemovalListener<String, ContainerStat>() { - @Override - public void onRemoval( - RemovalNotification<String, ContainerStat> removalNotification) { - synchronized (containerReportCache) { - ContainerStat stat = removalNotification.getValue(); - // remove invalid container report - metrics.decrContainerStat(stat); - LOG.debug( - "Remove expired container stat entry for datanode: {}.", - removalNotification.getKey()); - } - } - }).build(); - } - - /** - * Builds a message for logging startup information about an RPC server. - * - * @param description RPC server description - * @param addr RPC server listening address - * @return server startup message - */ - private static String buildRpcServerStartMessage(String description, - InetSocketAddress addr) { - return addr != null ? String.format("%s is listening at %s", - description, addr.toString()) : - String.format("%s not started", description); - } - - /** - * Starts an RPC server, if configured. - * - * @param conf configuration - * @param addr configured address of RPC server - * @param protocol RPC protocol provided by RPC server - * @param instance RPC protocol implementation instance - * @param handlerCount RPC server handler count - * - * @return RPC server - * @throws IOException if there is an I/O error while creating RPC server - */ - private static RPC.Server startRpcServer(OzoneConfiguration conf, - InetSocketAddress addr, Class<?> protocol, BlockingService instance, - int handlerCount) - throws IOException { - RPC.Server rpcServer = new RPC.Builder(conf) - .setProtocol(protocol) - .setInstance(instance) - .setBindAddress(addr.getHostString()) - .setPort(addr.getPort()) - .setNumHandlers(handlerCount) - .setVerbose(false) - .setSecretManager(null) - .build(); - - DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); - return rpcServer; - } - - private void registerMXBean() { - Map<String, String> jmxProperties = new HashMap<>(); - jmxProperties.put("component", "ServerRuntime"); - this.scmInfoBeanName = - MBeans.register("StorageContainerManager", - "StorageContainerManagerInfo", - jmxProperties, - this); - } - - private void unregisterMXBean() { - if(this.scmInfoBeanName != null) { - MBeans.unregister(this.scmInfoBeanName); - this.scmInfoBeanName = null; - } - } - - /** - * Main entry point for starting StorageContainerManager. - * - * @param argv arguments - * @throws IOException if startup fails due to I/O error - */ - public static void main(String[] argv) throws IOException { - if (DFSUtil.parseHelpArgument(argv, USAGE, - System.out, true)) { - System.exit(0); - } - try { - OzoneConfiguration conf = new OzoneConfiguration(); - GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); - if (!hParser.isParseSuccessful()) { - System.err.println("USAGE: " + USAGE + "\n"); - hParser.printGenericCommandUsage(System.err); - System.exit(1); - } - StringUtils.startupShutdownMessage(StorageContainerManager.class, - argv, LOG); - StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf); - if (scm != null) { - scm.start(); - scm.join(); - } - } catch (Throwable t) { - LOG.error("Failed to start the StorageContainerManager.", t); - terminate(1, t); - } - } - - private static void printUsage(PrintStream out) { - out.println(USAGE + "\n"); - } - - public static StorageContainerManager createSCM(String[] argv, - OzoneConfiguration conf) throws IOException { - if (!DFSUtil.isOzoneEnabled(conf)) { - System.err.println("SCM cannot be started in secure mode or when " + - OZONE_ENABLED + " is set to false"); - System.exit(1); - } - StartupOption startOpt = parseArguments(argv); - if (startOpt == null) { - printUsage(System.err); - terminate(1); - return null; - } - switch (startOpt) { - case INIT: - terminate(scmInit(conf) ? 0 : 1); - return null; - case GENCLUSTERID: - System.out.println("Generating new cluster id:"); - System.out.println(StorageInfo.newClusterID()); - terminate(0); - return null; - case HELP: - printUsage(System.err); - terminate(0); - return null; - default: - return new StorageContainerManager(conf); - } - } - - /** - * Routine to set up the Version info for StorageContainerManager. - * - * @param conf OzoneConfiguration - * @return true if SCM initialization is successful, false otherwise. - * @throws IOException if init fails due to I/O error - */ - public static boolean scmInit(OzoneConfiguration conf) throws IOException { - SCMStorage scmStorage = new SCMStorage(conf); - StorageState state = scmStorage.getState(); - if (state != StorageState.INITIALIZED) { - try { - String clusterId = StartupOption.INIT.getClusterId(); - if (clusterId != null && !clusterId.isEmpty()) { - scmStorage.setClusterId(clusterId); - } - scmStorage.initialize(); - System.out.println("SCM initialization succeeded." + - "Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid=" - + scmStorage.getClusterID()); - return true; - } catch (IOException ioe) { - LOG.error("Could not initialize SCM version file", ioe); - return false; - } - } else { - System.out.println("SCM already initialized. Reusing existing" + - " cluster id for sd=" + scmStorage.getStorageDir() + ";cid=" - + scmStorage.getClusterID()); - return true; - } - } - - private static StartupOption parseArguments(String[] args) { - int argsLen = (args == null) ? 0 : args.length; - StartupOption startOpt = StartupOption.HELP; - if (argsLen == 0) { - startOpt = StartupOption.REGULAR; - } - for (int i = 0; i < argsLen; i++) { - String cmd = args[i]; - if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) { - startOpt = StartupOption.INIT; - if (argsLen > 3) { - return null; - } - for (i = i + 1; i < argsLen; i++) { - if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) { - i++; - if (i < argsLen && !args[i].isEmpty()) { - startOpt.setClusterId(args[i]); - } else { - // if no cluster id specified or is empty string, return null - LOG.error("Must specify a valid cluster ID after the " - + StartupOption.CLUSTERID.getName() + " flag"); - return null; - } - } else { - return null; - } - } - } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) { - if (argsLen > 1) { - return null; - } - startOpt = StartupOption.GENCLUSTERID; - } - } - return startOpt; - } - - /** - * Returns a SCMCommandRepose from the SCM Command. - * @param cmd - Cmd - * @return SCMCommandResponseProto - * @throws InvalidProtocolBufferException - */ - @VisibleForTesting - public SCMCommandResponseProto getCommandResponse(SCMCommand cmd, - final String datanodID) - throws IOException { - Type type = cmd.getType(); - SCMCommandResponseProto.Builder builder = - SCMCommandResponseProto.newBuilder() - .setDatanodeUUID(datanodID); - switch (type) { - case registeredCommand: - return builder.setCmdType(Type.registeredCommand) - .setRegisteredProto( - SCMRegisteredCmdResponseProto.getDefaultInstance()) - .build(); - case versionCommand: - return builder.setCmdType(Type.versionCommand) - .setVersionProto(SCMVersionResponseProto.getDefaultInstance()) - .build(); - case sendContainerReport: - return builder.setCmdType(Type.sendContainerReport) - .setSendReport(SendContainerReportProto.getDefaultInstance()) - .build(); - case reregisterCommand: - return builder.setCmdType(Type.reregisterCommand) - .setReregisterProto(SCMReregisterCmdResponseProto - .getDefaultInstance()) - .build(); - case deleteBlocksCommand: - // Once SCM sends out the deletion message, increment the count. - // this is done here instead of when SCM receives the ACK, because - // DN might not be able to response the ACK for sometime. In case - // it times out, SCM needs to re-send the message some more times. - List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted() - .stream().map(tx -> tx.getTxID()).collect(Collectors.toList()); - this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs); - return builder.setCmdType(Type.deleteBlocksCommand) - .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto()) - .build(); - case closeContainerCommand: - return builder.setCmdType(Type.closeContainerCommand) - .setCloseContainerProto(((CloseContainerCommand)cmd).getProto()) - .build(); - default: - throw new IllegalArgumentException("Not implemented"); - } - } - - @VisibleForTesting - public static SCMRegisteredCmdResponseProto getRegisteredResponse( - SCMCommand cmd, SCMNodeAddressList addressList) { - Preconditions.checkState(cmd.getClass() == RegisteredCommand.class); - RegisteredCommand rCmd = (RegisteredCommand) cmd; - StorageContainerDatanodeProtocolProtos.Type type = cmd.getType(); - if (type != Type.registeredCommand) { - throw new IllegalArgumentException("Registered command is not well " + - "formed. Internal Error."); - } - return SCMRegisteredCmdResponseProto.newBuilder() - //TODO : Fix this later when we have multiple SCM support. - //.setAddressList(addressList) - .setErrorCode(rCmd.getError()) - .setClusterID(rCmd.getClusterID()) - .setDatanodeUUID(rCmd.getDatanodeUUID()).build(); - } - - /** - * {@inheritDoc} - */ - @Override - public Pipeline getContainer(String containerName) throws IOException { - checkAdminAccess(); - return scmContainerManager.getContainer(containerName).getPipeline(); - } - - @VisibleForTesting - public ContainerInfo getContainerInfo(String containerName) - throws IOException { - return scmContainerManager.getContainer(containerName); - } - - /** - * {@inheritDoc} - */ - @Override - public List<ContainerInfo> listContainer(String startName, - String prefixName, int count) throws IOException { - return scmContainerManager.listContainer(startName, prefixName, count); - } - - /** - * {@inheritDoc} - */ - @Override - public void deleteContainer(String containerName) throws IOException { - checkAdminAccess(); - scmContainerManager.deleteContainer(containerName); - } - - /** - * Queries a list of Node Statuses. - * - * @param nodeStatuses - * @param queryScope - * @param poolName @return List of Datanodes. - */ - @Override - public OzoneProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses, - OzoneProtos.QueryScope queryScope, String poolName) throws IOException { - - if (queryScope == OzoneProtos.QueryScope.POOL) { - throw new IllegalArgumentException("Not Supported yet"); - } - - List<DatanodeID> datanodes = queryNode(nodeStatuses); - OzoneProtos.NodePool.Builder poolBuilder = - OzoneProtos.NodePool.newBuilder(); - - for (DatanodeID datanode : datanodes) { - OzoneProtos.Node node = OzoneProtos.Node.newBuilder() - .setNodeID(datanode.getProtoBufMessage()) - .addAllNodeStates(nodeStatuses) - .build(); - poolBuilder.addNodes(node); - } - - return poolBuilder.build(); - } - - /** - * Notify from client when begin/finish operation for container/pipeline - * objects on datanodes. - * @param type - * @param name - * @param op - * @param stage - */ - @Override - public void notifyObjectStageChange( - ObjectStageChangeRequestProto.Type type, String name, - ObjectStageChangeRequestProto.Op op, - ObjectStageChangeRequestProto.Stage stage) throws IOException { - - LOG.info("Object type {} name {} op {} new stage {}", - type, name, op, stage); - if (type == ObjectStageChangeRequestProto.Type.container) { - if (op == ObjectStageChangeRequestProto.Op.create) { - if (stage == ObjectStageChangeRequestProto.Stage.begin) { - scmContainerManager.updateContainerState(name, - OzoneProtos.LifeCycleEvent.CREATE); - } else { - scmContainerManager.updateContainerState(name, - OzoneProtos.LifeCycleEvent.CREATED); - } - } else if (op == ObjectStageChangeRequestProto.Op.close) { - if (stage == ObjectStageChangeRequestProto.Stage.begin) { - scmContainerManager.updateContainerState(name, - OzoneProtos.LifeCycleEvent.FINALIZE); - } else { - scmContainerManager.updateContainerState(name, - OzoneProtos.LifeCycleEvent.CLOSE); - } - } - } //else if (type == ObjectStageChangeRequestProto.Type.pipeline) { - // TODO: pipeline state update will be addressed in future patch. - //} - } - - /** - * Creates a replication pipeline of a specified type. - */ - @Override - public Pipeline createReplicationPipeline( - OzoneProtos.ReplicationType replicationType, - OzoneProtos.ReplicationFactor factor, - OzoneProtos.NodePool nodePool) - throws IOException { - // TODO: will be addressed in future patch. - return null; - } - - /** - * Queries a list of Node that match a set of statuses. - * <p> - * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, - * then this call will return all healthy nodes which members in - * Raft pipeline. - * <p> - * Right now we don't support operations, so we assume it is an AND operation - * between the operators. - * - * @param nodeStatuses - A set of NodeStates. - * @return List of Datanodes. - */ - - public List<DatanodeID> queryNode(EnumSet<NodeState> nodeStatuses) { - Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null"); - Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " + - "in the query set"); - List<DatanodeID> resultList = new LinkedList<>(); - Set<DatanodeID> currentSet = new TreeSet<>(); - - for (NodeState nodeState : nodeStatuses) { - Set<DatanodeID> nextSet = queryNodeState(nodeState); - if ((nextSet == null) || (nextSet.size() == 0)) { - // Right now we only support AND operation. So intersect with - // any empty set is null. - return resultList; - } - // First time we have to add all the elements, next time we have to - // do an intersection operation on the set. - if (currentSet.size() == 0) { - currentSet.addAll(nextSet); - } else { - currentSet.retainAll(nextSet); - } - } - - resultList.addAll(currentSet); - return resultList; - } - - /** - * Query the System for Nodes. - * - * @param nodeState - NodeState that we are interested in matching. - * @return Set of Datanodes that match the NodeState. - */ - private Set<DatanodeID> queryNodeState(NodeState nodeState) { - if (nodeState == NodeState.RAFT_MEMBER || - nodeState == NodeState.FREE_NODE) { - throw new IllegalStateException("Not implemented yet"); - } - Set<DatanodeID> returnSet = new TreeSet<>(); - List<DatanodeID> tmp = getScmNodeManager().getNodes(nodeState); - if ((tmp != null) && (tmp.size() > 0)) { - returnSet.addAll(tmp); - } - return returnSet; - } - - /** - * Asks SCM where a container should be allocated. SCM responds with the set - * of datanodes that should be used creating this container. - * - * @param containerName - Name of the container. - * @param replicationFactor - replication factor. - * @return pipeline - * @throws IOException - */ - @Override - public Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType, - OzoneProtos.ReplicationFactor replicationFactor, String containerName, - String owner) throws IOException { - - checkAdminAccess(); - return scmContainerManager - .allocateContainer(replicationType, replicationFactor, containerName, - owner).getPipeline(); - } - - /** - * Returns listening address of StorageLocation Protocol RPC server. - * - * @return listen address of StorageLocation RPC server - */ - @VisibleForTesting - public InetSocketAddress getClientRpcAddress() { - return clientRpcAddress; - } - - @Override - public String getClientRpcPort() { - InetSocketAddress addr = getClientRpcAddress(); - return addr == null ? "0" : Integer.toString(addr.getPort()); - } - - /** - * Returns listening address of StorageDatanode Protocol RPC server. - * - * @return Address where datanode are communicating. - */ - public InetSocketAddress getDatanodeRpcAddress() { - return datanodeRpcAddress; - } - - @Override - public String getDatanodeRpcPort() { - InetSocketAddress addr = getDatanodeRpcAddress(); - return addr == null ? "0" : Integer.toString(addr.getPort()); - } - - /** - * Start service. - */ - public void start() throws IOException { - LOG.info(buildRpcServerStartMessage( - "StorageContainerLocationProtocol RPC server", clientRpcAddress)); - DefaultMetricsSystem.initialize("StorageContainerManager"); - clientRpcServer.start(); - LOG.info(buildRpcServerStartMessage( - "ScmBlockLocationProtocol RPC server", blockRpcAddress)); - blockRpcServer.start(); - LOG.info(buildRpcServerStartMessage("RPC server for DataNodes", - datanodeRpcAddress)); - datanodeRpcServer.start(); - httpServer.start(); - scmBlockManager.start(); - - setStartTime(); - - } - - /** - * Stop service. - */ - public void stop() { - try { - LOG.info("Stopping block service RPC server"); - blockRpcServer.stop(); - } catch (Exception ex) { - LOG.error("Storage Container Manager blockRpcServer stop failed.", ex); - } - - try { - LOG.info("Stopping the StorageContainerLocationProtocol RPC server"); - clientRpcServer.stop(); - } catch (Exception ex) { - LOG.error("Storage Container Manager clientRpcServer stop failed.", ex); - } - - try { - LOG.info("Stopping the RPC server for DataNodes"); - datanodeRpcServer.stop(); - } catch (Exception ex) { - LOG.error("Storage Container Manager datanodeRpcServer stop failed.", ex); - } - - try { - LOG.info("Stopping Storage Container Manager HTTP server."); - httpServer.stop(); - } catch (Exception ex) { - LOG.error("Storage Container Manager HTTP server stop failed.", ex); - } - - try { - LOG.info("Stopping Block Manager Service."); - scmBlockManager.stop(); - } catch (Exception ex) { - LOG.error("SCM block manager service stop failed.", ex); - } - - if (containerReportCache != null) { - containerReportCache.invalidateAll(); - containerReportCache.cleanUp(); - } - - if (metrics != null) { - metrics.unRegister(); - } - - unregisterMXBean(); - IOUtils.cleanupWithLogger(LOG, scmContainerManager); - IOUtils.cleanupWithLogger(LOG, scmNodeManager); - } - - /** - * Wait until service has completed shutdown. - */ - public void join() { - try { - blockRpcServer.join(); - clientRpcServer.join(); - datanodeRpcServer.join(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.info("Interrupted during StorageContainerManager join."); - } - } - - /** - * Returns SCM version. - * - * @return Version info. - */ - @Override - public SCMVersionResponseProto getVersion( - SCMVersionRequestProto versionRequest) throws IOException { - return getScmNodeManager().getVersion(versionRequest).getProtobufMessage(); - } - - /** - * Used by data node to send a Heartbeat. - * - * @param datanodeID - Datanode ID. - * @param nodeReport - Node Report - * @param reportState - Container report ready info. - * @return - SCMHeartbeatResponseProto - * @throws IOException - */ - @Override - public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID, - SCMNodeReport nodeReport, ReportState reportState) throws IOException { - List<SCMCommand> commands = - getScmNodeManager().sendHeartbeat(datanodeID, nodeReport, reportState); - List<SCMCommandResponseProto> cmdResponses = new LinkedList<>(); - for (SCMCommand cmd : commands) { - cmdResponses.add(getCommandResponse(cmd, datanodeID.getDatanodeUuid() - .toString())); - } - return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) - .build(); - } - - /** - * Register Datanode. - * - * @param datanodeID - DatanodID. - * @param scmAddresses - List of SCMs this datanode is configured to - * communicate. - * @return SCM Command. - */ - @Override - public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto - register(DatanodeID datanodeID, String[] scmAddresses) - throws IOException { - // TODO : Return the list of Nodes that forms the SCM HA. - return getRegisteredResponse(scmNodeManager.register(datanodeID), null); - } - - /** - * Send a container report. - * - * @param reports -- Container report - * @return HeartbeatRespose.nullcommand. - * @throws IOException - */ - @Override - public ContainerReportsResponseProto sendContainerReport( - ContainerReportsRequestProto reports) throws IOException { - updateContainerReportMetrics(reports); - - // should we process container reports async? - scmContainerManager.processContainerReports(reports); - return ContainerReportsResponseProto.newBuilder().build(); - } - - private void updateContainerReportMetrics( - ContainerReportsRequestProto reports) { - ContainerStat newStat = null; - // TODO: We should update the logic once incremental container report - // type is supported. - if (reports - .getType() == ContainerReportsRequestProto.reportType.fullReport) { - newStat = new ContainerStat(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports - .getReportsList()) { - newStat.add(new ContainerStat(info.getSize(), info.getUsed(), - info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), - info.getReadCount(), info.getWriteCount())); - } - - // update container metrics - metrics.setLastContainerStat(newStat); - } - - // Update container stat entry, this will trigger a removal operation if it - // exists in cache. - synchronized (containerReportCache) { - String datanodeUuid = reports.getDatanodeID().getDatanodeUuid(); - if (datanodeUuid != null && newStat != null) { - containerReportCache.put(datanodeUuid, newStat); - // update global view container metrics - metrics.incrContainerStat(newStat); - } - } - } - - /** - * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved, - * SCM considers the blocks are deleted and update the metadata in SCM DB. - * - * @param acks - * @return - * @throws IOException - */ - @Override - public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( - ContainerBlocksDeletionACKProto acks) throws IOException { - if (acks.getResultsCount() > 0) { - List<DeleteBlockTransactionResult> resultList = acks.getResultsList(); - for (DeleteBlockTransactionResult result : resultList) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got block deletion ACK from datanode, TXIDs={}, " - + "success={}", result.getTxID(), result.getSuccess()); - } - if (result.getSuccess()) { - LOG.debug("Purging TXID={} from block deletion log", - result.getTxID()); - this.getScmBlockManager().getDeletedBlockLog() - .commitTransactions(Collections.singletonList(result.getTxID())); - } else { - LOG.warn("Got failed ACK for TXID={}, prepare to resend the " - + "TX in next interval", result.getTxID()); - } - } - } - return ContainerBlocksDeletionACKResponseProto.newBuilder() - .getDefaultInstanceForType(); - } - - /** - * Returns the Number of Datanodes that are communicating with SCM. - * - * @param nodestate Healthy, Dead etc. - * @return int -- count - */ - public int getNodeCount(NodeState nodestate) { - return scmNodeManager.getNodeCount(nodestate); - } - - /** - * Returns SCM container manager. - */ - @VisibleForTesting - public Mapping getScmContainerManager() { - return scmContainerManager; - } - - /** - * Returns node manager. - * @return - Node Manager - */ - @VisibleForTesting - public NodeManager getScmNodeManager() { - return scmNodeManager; - } - - @VisibleForTesting - public BlockManager getScmBlockManager() { - return scmBlockManager; - } - - /** - * Get block locations. - * @param keys batch of block keys to retrieve. - * @return set of allocated blocks. - * @throws IOException - */ - @Override - public Set<AllocatedBlock> getBlockLocations(final Set<String> keys) - throws IOException { - Set<AllocatedBlock> locatedBlocks = new HashSet<>(); - for (String key: keys) { - Pipeline pipeline = scmBlockManager.getBlock(key); - AllocatedBlock block = new AllocatedBlock.Builder() - .setKey(key) - .setPipeline(pipeline).build(); - locatedBlocks.add(block); - } - return locatedBlocks; - } - - /** - * Asks SCM where a block should be allocated. SCM responds with the set of - * datanodes that should be used creating this block. - * - * @param size - size of the block. - * @param type - Replication type. - * @param factor - * @return allocated block accessing info (key, pipeline). - * @throws IOException - */ - @Override - public AllocatedBlock allocateBlock(long size, - OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor factor, - String owner) throws IOException { - return scmBlockManager.allocateBlock(size, type, factor, owner); - } - - /** - * Get the clusterId and SCM Id from the version file in SCM. - */ - @Override - public ScmInfo getScmInfo() throws IOException { - ScmInfo.Builder builder = new ScmInfo.Builder() - .setClusterId(scmStorage.getClusterID()) - .setScmId(scmStorage.getScmId()); - return builder.build(); - } - /** - * Delete blocks for a set of object keys. - * - * @param keyBlocksInfoList list of block keys with object keys to delete. - * @return deletion results. - */ - public List<DeleteBlockGroupResult> deleteKeyBlocks( - List<BlockGroup> keyBlocksInfoList) throws IOException { - LOG.info("SCM is informed by KSM to delete {} blocks", - keyBlocksInfoList.size()); - List<DeleteBlockGroupResult> results = new ArrayList<>(); - for (BlockGroup keyBlocks : keyBlocksInfoList) { - Result resultCode; - try { - // We delete blocks in an atomic operation to prevent getting - // into state like only a partial of blocks are deleted, - // which will leave key in an inconsistent state. - scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList()); - resultCode = Result.success; - } catch (SCMException scmEx) { - LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); - switch (scmEx.getResult()) { - case CHILL_MODE_EXCEPTION: - resultCode = Result.chillMode; - break; - case FAILED_TO_FIND_BLOCK: - resultCode = Result.errorNotFound; - break; - default: - resultCode = Result.unknownFailure; - } - } catch (IOException ex) { - LOG.warn("Fail to delete blocks for object key: {}", - keyBlocks.getGroupID(), ex); - resultCode = Result.unknownFailure; - } - List<DeleteBlockResult> blockResultList = new ArrayList<>(); - for (String blockKey : keyBlocks.getBlockIDList()) { - blockResultList.add(new DeleteBlockResult(blockKey, resultCode)); - } - results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), - blockResultList)); - } - return results; - } - - @VisibleForTesting - public String getPpcRemoteUsername() { - UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser(); - return user == null ? null : user.getUserName(); - } - - private void checkAdminAccess() throws IOException { - String remoteUser = getPpcRemoteUsername(); - if(remoteUser != null) { - if (!scmAdminUsernames.contains(remoteUser)) { - throw new IOException( - "Access denied for user " + remoteUser - + ". Superuser privilege is required."); - } - } - } - - /** - * Initialize SCM metrics. - */ - public static void initMetrics() { - metrics = SCMMetrics.create(); - } - - /** - * Return SCM metrics instance. - */ - public static SCMMetrics getMetrics() { - return metrics == null ? SCMMetrics.create() : metrics; - } - - /** - * Invalidate container stat entry for given datanode. - * - * @param datanodeUuid - */ - public void removeContainerReport(String datanodeUuid) { - synchronized (containerReportCache) { - containerReportCache.invalidate(datanodeUuid); - } - } - - /** - * Get container stat of specified datanode. - * - * @param datanodeUuid - * @return - */ - public ContainerStat getContainerReport(String datanodeUuid) { - ContainerStat stat = null; - synchronized (containerReportCache) { - stat = containerReportCache.getIfPresent(datanodeUuid); - } - - return stat; - } - - /** - * Returns a view of the container stat entries. Modifications made to the - * map will directly affect the cache. - * - * @return - */ - public ConcurrentMap<String, ContainerStat> getContainerReportCache() { - return containerReportCache.asMap(); - } - - @Override - public Map<String, String> getContainerReport() { - Map<String, String> id2StatMap = new HashMap<>(); - synchronized (containerReportCache) { - ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap(); - for (Map.Entry<String, ContainerStat> entry : map.entrySet()) { - id2StatMap.put(entry.getKey(), entry.getValue().toJsonString()); - } - } - - return id2StatMap; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManagerHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManagerHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManagerHttpServer.java deleted file mode 100644 index 62c871c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManagerHttpServer.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.OzoneConfigKeys; -import org.apache.hadoop.ozone.web.OzoneHttpServer; -import org.apache.hadoop.scm.ScmConfigKeys; - -import java.io.IOException; - -/** - * HttpServer2 wrapper for the Ozone Storage Container Manager. - */ -public class StorageContainerManagerHttpServer extends OzoneHttpServer { - - public StorageContainerManagerHttpServer(Configuration conf) - throws IOException { - super(conf, "scm"); - } - - @Override protected String getHttpAddressKey() { - return ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY; - } - - @Override protected String getHttpBindHostKey() { - return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_KEY; - } - - @Override protected String getHttpsAddressKey() { - return ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY; - } - - @Override protected String getHttpsBindHostKey() { - return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_HOST_KEY; - } - - @Override protected String getBindHostDefault() { - return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_DEFAULT; - } - - @Override protected int getHttpBindPortDefault() { - return ScmConfigKeys.OZONE_SCM_HTTP_BIND_PORT_DEFAULT; - } - - @Override protected int getHttpsBindPortDefault() { - return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_PORT_DEFAULT; - } - - @Override protected String getKeytabFile() { - return ScmConfigKeys.OZONE_SCM_KEYTAB_FILE; - } - - @Override protected String getSpnegoPrincipal() { - return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL; - } - - @Override protected String getEnabledKey() { - return ScmConfigKeys.OZONE_SCM_HTTP_ENABLED_KEY; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java deleted file mode 100644 index 6bb3a22..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/VersionInfo.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm; - -/** - * This is a class that tracks versions of SCM. - */ -public final class VersionInfo { - - // We will just be normal and use positive counting numbers for versions. - private final static VersionInfo[] VERSION_INFOS = - {new VersionInfo("First version of SCM", 1)}; - - - public static final String DESCRIPTION_KEY = "Description"; - private final String description; - private final int version; - - /** - * Never created outside this class. - * - * @param description -- description - * @param version -- version number - */ - private VersionInfo(String description, int version) { - this.description = description; - this.version = version; - } - - /** - * Returns all versions. - * - * @return Version info array. - */ - public static VersionInfo[] getAllVersions() { - return VERSION_INFOS.clone(); - } - - /** - * Returns the latest version. - * - * @return versionInfo - */ - public static VersionInfo getLatestVersion() { - return VERSION_INFOS[VERSION_INFOS.length - 1]; - } - - /** - * Return description. - * - * @return String - */ - public String getDescription() { - return description; - } - - /** - * Return the version. - * - * @return int. - */ - public int getVersion() { - return version; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java deleted file mode 100644 index cfed7a8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm.block; - -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * - * Block APIs. - * Container is transparent to these APIs. - */ -public interface BlockManager extends Closeable { - /** - * Allocates a new block for a given size. - * @param size - Block Size - * @param type Replication Type - * @param factor - Replication Factor - * @return AllocatedBlock - * @throws IOException - */ - AllocatedBlock allocateBlock(long size, OzoneProtos.ReplicationType type, - OzoneProtos.ReplicationFactor factor, String owner) throws IOException; - - /** - * Give the key to the block, get the pipeline info. - * @param key - key to the block. - * @return - Pipeline that used to access the block. - * @throws IOException - */ - Pipeline getBlock(String key) throws IOException; - - /** - * Deletes a list of blocks in an atomic operation. Internally, SCM - * writes these blocks into a {@link DeletedBlockLog} and deletes them - * from SCM DB. If this is successful, given blocks are entering pending - * deletion state and becomes invisible from SCM namespace. - * - * @param blockIDs block IDs. This is often the list of blocks of - * a particular object key. - * @throws IOException if exception happens, non of the blocks is deleted. - */ - void deleteBlocks(List<String> blockIDs) throws IOException; - - /** - * @return the block deletion transaction log maintained by SCM. - */ - DeletedBlockLog getDeletedBlockLog(); - - /** - * Start block manager background services. - * @throws IOException - */ - void start() throws IOException; - - /** - * Shutdown block manager background services. - * @throws IOException - */ - void stop() throws IOException; - - /** - * @return the block deleting service executed in SCM. - */ - SCMBlockDeletingService getSCMBlockDeletingService(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java deleted file mode 100644 index 5771080..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ /dev/null @@ -1,530 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.block; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; -import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; -import org.apache.hadoop.ozone.scm.container.Mapping; -import org.apache.hadoop.ozone.scm.exceptions.SCMException; -import org.apache.hadoop.ozone.scm.node.NodeManager; -import org.apache.hadoop.ozone.web.utils.OzoneUtils; -import org.apache.hadoop.scm.ScmConfigKeys; -import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.management.ObjectName; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; -import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes - .CHILL_MODE_EXCEPTION; -import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes - .FAILED_TO_FIND_BLOCK; -import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes - .INVALID_BLOCK_SIZE; - -/** Block Manager manages the block access for SCM. */ -public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { - private static final Logger LOG = - LoggerFactory.getLogger(BlockManagerImpl.class); - // TODO : FIX ME : Hard coding the owner. - // Currently only user of the block service is Ozone, CBlock manages blocks - // by itself and does not rely on the Block service offered by SCM. - - private final NodeManager nodeManager; - private final Mapping containerManager; - private final MetadataStore blockStore; - - private final Lock lock; - private final long containerSize; - private final long cacheSize; - - private final DeletedBlockLog deletedBlockLog; - private final SCMBlockDeletingService blockDeletingService; - - private final int containerProvisionBatchSize; - private final Random rand; - private ObjectName mxBean; - - /** - * Constructor. - * - * @param conf - configuration. - * @param nodeManager - node manager. - * @param containerManager - container manager. - * @param cacheSizeMB - cache size for level db store. - * @throws IOException - */ - public BlockManagerImpl(final Configuration conf, - final NodeManager nodeManager, final Mapping containerManager, - final int cacheSizeMB) throws IOException { - this.nodeManager = nodeManager; - this.containerManager = containerManager; - this.cacheSize = cacheSizeMB; - - this.containerSize = OzoneConsts.GB * conf.getInt( - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, - ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); - File metaDir = OzoneUtils.getOzoneMetaDirPath(conf); - String scmMetaDataDir = metaDir.getPath(); - - // Write the block key to container name mapping. - File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB); - blockStore = - MetadataStoreBuilder.newBuilder() - .setConf(conf) - .setDbFile(blockContainerDbPath) - .setCacheSize(this.cacheSize * OzoneConsts.MB) - .build(); - - this.containerProvisionBatchSize = - conf.getInt( - ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, - ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT); - rand = new Random(); - this.lock = new ReentrantLock(); - - mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); - - // SCM block deleting transaction log and deleting service. - deletedBlockLog = new DeletedBlockLogImpl(conf); - long svcInterval = - conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, - OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - long serviceTimeout = - conf.getTimeDuration( - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - blockDeletingService = - new SCMBlockDeletingService( - deletedBlockLog, containerManager, nodeManager, svcInterval, - serviceTimeout, conf); - } - - /** - * Start block manager services. - * - * @throws IOException - */ - public void start() throws IOException { - this.blockDeletingService.start(); - } - - /** - * Shutdown block manager services. - * - * @throws IOException - */ - public void stop() throws IOException { - this.blockDeletingService.shutdown(); - this.close(); - } - - /** - * Pre allocate specified count of containers for block creation. - * - * @param count - Number of containers to allocate. - * @param type - Type of containers - * @param factor - how many copies needed for this container. - * @throws IOException - */ - private void preAllocateContainers(int count, ReplicationType type, - ReplicationFactor factor, String owner) - throws IOException { - lock.lock(); - try { - for (int i = 0; i < count; i++) { - String containerName = UUID.randomUUID().toString(); - ContainerInfo containerInfo = null; - try { - // TODO: Fix this later when Ratis is made the Default. - containerInfo = containerManager.allocateContainer(type, factor, - containerName, owner); - - if (containerInfo == null) { - LOG.warn("Unable to allocate container."); - continue; - } - } catch (IOException ex) { - LOG.warn("Unable to allocate container: {}", ex); - continue; - } - } - } finally { - lock.unlock(); - } - } - - /** - * Allocates a block in a container and returns that info. - * - * @param size - Block Size - * @param type Replication Type - * @param factor - Replication Factor - * @return Allocated block - * @throws IOException on failure. - */ - @Override - public AllocatedBlock allocateBlock(final long size, - ReplicationType type, ReplicationFactor factor, String owner) - throws IOException { - LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor); - - if (size < 0 || size > containerSize) { - LOG.warn("Invalid block size requested : {}", size); - throw new SCMException("Unsupported block size: " + size, - INVALID_BLOCK_SIZE); - } - - if (!nodeManager.isOutOfChillMode()) { - LOG.warn("Not out of Chill mode."); - throw new SCMException("Unable to create block while in chill mode", - CHILL_MODE_EXCEPTION); - } - - lock.lock(); - try { - /* - Here is the high level logic. - - 1. First we check if there are containers in ALLOCATED state, - that is - SCM has allocated them in the SCM namespace but the - corresponding - container has not been created in the Datanode yet. If we - have any - in that state, we will return that to the client, which allows - client to finish creating those containers. This is a sort of - greedy - algorithm, our primary purpose is to get as many containers as - possible. - - 2. If there are no allocated containers -- Then we find a Open - container that matches that pattern. - - 3. If both of them fail, the we will pre-allocate a bunch of - conatainers in SCM and try again. - - TODO : Support random picking of two containers from the list. - So we - can use different kind of policies. - */ - - ContainerInfo containerInfo; - - // Look for ALLOCATED container that matches all other parameters. - containerInfo = - containerManager - .getStateManager() - .getMatchingContainer( - size, owner, type, factor, OzoneProtos.LifeCycleState - .ALLOCATED); - if (containerInfo != null) { - containerManager.updateContainerState(containerInfo.getContainerName(), - OzoneProtos.LifeCycleEvent.CREATE); - return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED); - } - - // Since we found no allocated containers that match our criteria, let us - // look for OPEN containers that match the criteria. - containerInfo = - containerManager - .getStateManager() - .getMatchingContainer(size, owner, type, factor, OzoneProtos - .LifeCycleState.OPEN); - if (containerInfo != null) { - return newBlock(containerInfo, OzoneProtos.LifeCycleState.OPEN); - } - - // We found neither ALLOCATED or OPEN Containers. This generally means - // that most of our containers are full or we have not allocated - // containers of the type and replication factor. So let us go and - // allocate some. - preAllocateContainers(containerProvisionBatchSize, type, factor, owner); - - // Since we just allocated a set of containers this should work - containerInfo = - containerManager - .getStateManager() - .getMatchingContainer( - size, owner, type, factor, OzoneProtos.LifeCycleState - .ALLOCATED); - if (containerInfo != null) { - containerManager.updateContainerState(containerInfo.getContainerName(), - OzoneProtos.LifeCycleEvent.CREATE); - return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED); - } - - // we have tried all strategies we know and but somehow we are not able - // to get a container for this block. Log that info and return a null. - LOG.error( - "Unable to allocate a block for the size: {}, type: {}, " + - "factor: {}", - size, - type, - factor); - return null; - } finally { - lock.unlock(); - } - } - - /** - * newBlock - returns a new block assigned to a container. - * - * @param containerInfo - Container Info. - * @param state - Current state of the container. - * @return AllocatedBlock - */ - private AllocatedBlock newBlock( - ContainerInfo containerInfo, OzoneProtos.LifeCycleState state) - throws IOException { - - // TODO : Replace this with Block ID. - String blockKey = UUID.randomUUID().toString(); - boolean createContainer = (state == OzoneProtos.LifeCycleState.ALLOCATED); - - AllocatedBlock.Builder abb = - new AllocatedBlock.Builder() - .setKey(blockKey) - // TODO : Use containerinfo instead of pipeline. - .setPipeline(containerInfo.getPipeline()) - .setShouldCreateContainer(createContainer); - LOG.trace("New block allocated : {} Container ID: {}", blockKey, - containerInfo.toString()); - - if (containerInfo.getPipeline().getMachines().size() == 0) { - LOG.error("Pipeline Machine count is zero."); - return null; - } - - // Persist this block info to the blockStore DB, so getBlock(key) can - // find which container the block lives. - // TODO : Remove this DB in future - // and make this a KSM operation. Category: SCALABILITY. - if (containerInfo.getPipeline().getMachines().size() > 0) { - blockStore.put( - DFSUtil.string2Bytes(blockKey), - DFSUtil.string2Bytes(containerInfo.getPipeline().getContainerName())); - } - return abb.build(); - } - - /** - * Given a block key, return the Pipeline information. - * - * @param key - block key assigned by SCM. - * @return Pipeline (list of DNs and leader) to access the block. - * @throws IOException - */ - @Override - public Pipeline getBlock(final String key) throws IOException { - lock.lock(); - try { - byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key)); - if (containerBytes == null) { - throw new SCMException( - "Specified block key does not exist. key : " + key, - FAILED_TO_FIND_BLOCK); - } - - String containerName = DFSUtil.bytes2String(containerBytes); - ContainerInfo containerInfo = containerManager.getContainer( - containerName); - if (containerInfo == null) { - LOG.debug("Container {} allocated by block service" - + "can't be found in SCM", containerName); - throw new SCMException( - "Unable to find container for the block", - SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); - } - return containerInfo.getPipeline(); - } finally { - lock.unlock(); - } - } - - /** - * Deletes a list of blocks in an atomic operation. Internally, SCM writes - * these blocks into a - * {@link DeletedBlockLog} and deletes them from SCM DB. If this is - * successful, given blocks are - * entering pending deletion state and becomes invisible from SCM namespace. - * - * @param blockIDs block IDs. This is often the list of blocks of a - * particular object key. - * @throws IOException if exception happens, non of the blocks is deleted. - */ - @Override - public void deleteBlocks(List<String> blockIDs) throws IOException { - if (!nodeManager.isOutOfChillMode()) { - throw new SCMException("Unable to delete block while in chill mode", - CHILL_MODE_EXCEPTION); - } - - lock.lock(); - LOG.info("Deleting blocks {}", String.join(",", blockIDs)); - Map<String, List<String>> containerBlocks = new HashMap<>(); - BatchOperation batch = new BatchOperation(); - BatchOperation rollbackBatch = new BatchOperation(); - // TODO: track the block size info so that we can reclaim the container - // TODO: used space when the block is deleted. - try { - for (String blockKey : blockIDs) { - byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey); - byte[] containerBytes = blockStore.get(blockKeyBytes); - if (containerBytes == null) { - throw new SCMException( - "Specified block key does not exist. key : " + blockKey, - FAILED_TO_FIND_BLOCK); - } - batch.delete(blockKeyBytes); - rollbackBatch.put(blockKeyBytes, containerBytes); - - // Merge blocks to a container to blocks mapping, - // prepare to persist this info to the deletedBlocksLog. - String containerName = DFSUtil.bytes2String(containerBytes); - if (containerBlocks.containsKey(containerName)) { - containerBlocks.get(containerName).add(blockKey); - } else { - List<String> item = new ArrayList<>(); - item.add(blockKey); - containerBlocks.put(containerName, item); - } - } - - // We update SCM DB first, so if this step fails, we end up here, - // nothing gets into the delLog so no blocks will be accidentally - // removed. If we write the log first, once log is written, the - // async deleting service will start to scan and might be picking - // up some blocks to do real deletions, that might cause data loss. - blockStore.writeBatch(batch); - try { - deletedBlockLog.addTransactions(containerBlocks); - } catch (IOException e) { - try { - // If delLog update is failed, we need to rollback the changes. - blockStore.writeBatch(rollbackBatch); - } catch (IOException rollbackException) { - // This is a corner case. AddTX fails and rollback also fails, - // this will leave these blocks in inconsistent state. They were - // moved to pending deletion state in SCM DB but were not written - // into delLog so real deletions would not be done. Blocks become - // to be invisible from namespace but actual data are not removed. - // We log an error here so admin can manually check and fix such - // errors. - LOG.error( - "Blocks might be in inconsistent state because" - + " they were moved to pending deletion state in SCM DB but" - + " not written into delLog. Admin can manually add them" - + " into delLog for deletions. Inconsistent block list: {}", - String.join(",", blockIDs), - e); - throw rollbackException; - } - throw new IOException( - "Skip writing the deleted blocks info to" - + " the delLog because addTransaction fails. Batch skipped: " - + String.join(",", blockIDs), - e); - } - // TODO: Container report handling of the deleted blocks: - // Remove tombstone and update open container usage. - // We will revisit this when the closed container replication is done. - } finally { - lock.unlock(); - } - } - - @Override - public DeletedBlockLog getDeletedBlockLog() { - return this.deletedBlockLog; - } - - @VisibleForTesting - public String getDeletedKeyName(String key) { - return StringUtils.format(".Deleted/%s", key); - } - - /** - * Close the resources for BlockManager. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - if (blockStore != null) { - blockStore.close(); - } - if (deletedBlockLog != null) { - deletedBlockLog.close(); - } - blockDeletingService.shutdown(); - if (mxBean != null) { - MBeans.unregister(mxBean); - mxBean = null; - } - } - - @Override - public int getOpenContainersNo() { - return 0; - // TODO : FIX ME : The open container being a single number does not make - // sense. - // We have to get open containers by Replication Type and Replication - // factor. Hence returning 0 for now. - // containers.get(OzoneProtos.LifeCycleState.OPEN).size(); - } - - @Override - public SCMBlockDeletingService getSCMBlockDeletingService() { - return this.blockDeletingService; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockmanagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockmanagerMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockmanagerMXBean.java deleted file mode 100644 index efcfc63..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockmanagerMXBean.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.ozone.scm.block; - - -/** - * JMX interface for the block manager. - */ -public interface BlockmanagerMXBean { - - /** - * Number of open containers manager by the block manager. - */ - int getOpenContainersNo(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java deleted file mode 100644 index 31c61dd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.scm.block; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.ozone.scm.container.Mapping; -import org.apache.hadoop.scm.container.common.helpers.ContainerInfo; - -import com.google.common.collect.ArrayListMultimap; - -/** - * A wrapper class to hold info about datanode and all deleted block - * transactions that will be sent to this datanode. - */ -public class DatanodeDeletedBlockTransactions { - private int nodeNum; - // The throttle size for each datanode. - private int maximumAllowedTXNum; - // Current counter of inserted TX. - private int currentTXNum; - private Mapping mappingService; - // A list of TXs mapped to a certain datanode ID. - private final ArrayListMultimap<DatanodeID, DeletedBlocksTransaction> - transactions; - - DatanodeDeletedBlockTransactions(Mapping mappingService, - int maximumAllowedTXNum, int nodeNum) { - this.transactions = ArrayListMultimap.create(); - this.mappingService = mappingService; - this.maximumAllowedTXNum = maximumAllowedTXNum; - this.nodeNum = nodeNum; - } - - public void addTransaction(DeletedBlocksTransaction tx) throws IOException { - ContainerInfo info = null; - try { - info = mappingService.getContainer(tx.getContainerName()); - } catch (IOException e) { - SCMBlockDeletingService.LOG.warn("Got container info error.", e); - } - - if (info == null) { - SCMBlockDeletingService.LOG.warn( - "Container {} not found, continue to process next", - tx.getContainerName()); - return; - } - - for (DatanodeID dnID : info.getPipeline().getMachines()) { - if (transactions.containsKey(dnID)) { - List<DeletedBlocksTransaction> txs = transactions.get(dnID); - if (txs != null && txs.size() < maximumAllowedTXNum) { - boolean hasContained = false; - for (DeletedBlocksTransaction t : txs) { - if (t.getContainerName().equals(tx.getContainerName())) { - hasContained = true; - break; - } - } - - if (!hasContained) { - txs.add(tx); - currentTXNum++; - } - } - } else { - currentTXNum++; - transactions.put(dnID, tx); - } - SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID, - tx.getTxID()); - } - } - - Set<DatanodeID> getDatanodes() { - return transactions.keySet(); - } - - boolean isEmpty() { - return transactions.isEmpty(); - } - - boolean hasTransactions(DatanodeID dnID) { - return transactions.containsKey(dnID) && !transactions.get(dnID).isEmpty(); - } - - List<DeletedBlocksTransaction> getDatanodeTransactions( - DatanodeID dnID) { - return transactions.get(dnID); - } - - List<String> getTransactionIDList(DatanodeID dnID) { - if (hasTransactions(dnID)) { - return transactions.get(dnID).stream() - .map(DeletedBlocksTransaction::getTxID).map(String::valueOf) - .collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } - } - - boolean isFull() { - return currentTXNum >= maximumAllowedTXNum * nodeNum; - } - - int getTXNum() { - return currentTXNum; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java deleted file mode 100644 index d8af853..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm.block; - -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * The DeletedBlockLog is a persisted log in SCM to keep tracking - * container blocks which are under deletion. It maintains info - * about under-deletion container blocks that notified by KSM, - * and the state how it is processed. - */ -public interface DeletedBlockLog extends Closeable { - - /** - * A limit size list of transactions. Note count is the max number - * of TXs to return, we might not be able to always return this - * number. and the processCount of those transactions - * should be [0, MAX_RETRY). - * - * @param count - number of transactions. - * @return a list of BlockDeletionTransaction. - */ - List<DeletedBlocksTransaction> getTransactions(int count) - throws IOException; - - /** - * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions. - * Once DatanodeDeletedBlockTransactions is full, the scan behavior will - * stop. - * @param transactions a list of TXs will be set into. - * @throws IOException - */ - void getTransactions(DatanodeDeletedBlockTransactions transactions) - throws IOException; - - /** - * Return all failed transactions in the log. A transaction is considered - * to be failed if it has been sent more than MAX_RETRY limit and its - * count is reset to -1. - * - * @return a list of failed deleted block transactions. - * @throws IOException - */ - List<DeletedBlocksTransaction> getFailedTransactions() - throws IOException; - - /** - * Increments count for given list of transactions by 1. - * The log maintains a valid range of counts for each transaction - * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate - * the transaction is no longer valid. - * - * @param txIDs - transaction ID. - */ - void incrementCount(List<Long> txIDs) - throws IOException; - - /** - * Commits a transaction means to delete all footprints of a transaction - * from the log. This method doesn't guarantee all transactions can be - * successfully deleted, it tolerate failures and tries best efforts to. - * - * @param txIDs - transaction IDs. - */ - void commitTransactions(List<Long> txIDs) throws IOException; - - /** - * Creates a block deletion transaction and adds that into the log. - * - * @param containerName - container name. - * @param blocks - blocks that belong to the same container. - * - * @throws IOException - */ - void addTransaction(String containerName, List<String> blocks) - throws IOException; - - /** - * Creates block deletion transactions for a set of containers, - * add into the log and persist them atomically. An object key - * might be stored in multiple containers and multiple blocks, - * this API ensures that these updates are done in atomic manner - * so if any of them fails, the entire operation fails without - * any updates to the log. Note, this doesn't mean to create only - * one transaction, it creates multiple transactions (depends on the - * number of containers) together (on success) or non (on failure). - * - * @param containerBlocksMap a map of containerBlocks. - * @throws IOException - */ - void addTransactions(Map<String, List<String>> containerBlocksMap) - throws IOException; - - /** - * Returns the total number of valid transactions. A transaction is - * considered to be valid as long as its count is in range [0, MAX_RETRY]. - * - * @return number of a valid transactions. - * @throws IOException - */ - int getNumOfValidTransactions() throws IOException; -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org