http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java new file mode 100644 index 0000000..1a78dee --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java @@ -0,0 +1,1290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import com.google.protobuf.BlockingService; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.scm.block.BlockManager; +import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; +import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto + .DeleteBlockTransactionResult; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos + .ContainerBlocksDeletionACKResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReportState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCmdType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMNodeReport; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SendContainerReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto; +import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.common.BlockGroup; +import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; +import org.apache.hadoop.ozone.common.Storage.StorageState; +import org.apache.hadoop.ozone.common.StorageInfo; +import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.apache.hadoop.ozone.protocolPB + .ScmBlockLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB + .StorageContainerLocationProtocolServerSideTranslatorPB; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CLIENT_ADDRESS_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdds.protocol.proto + .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result; +import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; +import static org.apache.hadoop.util.ExitUtil.terminate; + +/** + * StorageContainerManager is the main entry point for the service that provides + * information about which SCM nodes host containers. + * + * DataNodes report to StorageContainerManager using heartbeat + * messages. SCM allocates containers and returns a pipeline. + * + * A client once it gets a pipeline (a list of datanodes) will connect to the + * datanodes and create a container, which then can be used to store data. + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"}) +public class StorageContainerManager extends ServiceRuntimeInfoImpl + implements StorageContainerDatanodeProtocol, + StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean { + + private static final Logger LOG = + LoggerFactory.getLogger(StorageContainerManager.class); + + /** + * Startup options. + */ + public enum StartupOption { + INIT("-init"), + CLUSTERID("-clusterid"), + GENCLUSTERID("-genclusterid"), + REGULAR("-regular"), + HELP("-help"); + + private final String name; + private String clusterId = null; + + public void setClusterId(String cid) { + if(cid != null && !cid.isEmpty()) { + clusterId = cid; + } + } + + public String getClusterId() { + return clusterId; + } + + StartupOption(String arg) { + this.name = arg; + } + + public String getName() { + return name; + } + } + + /** + * NodeManager and container Managers for SCM. + */ + private final NodeManager scmNodeManager; + private final Mapping scmContainerManager; + private final BlockManager scmBlockManager; + private final SCMStorage scmStorage; + + /** The RPC server that listens to requests from DataNodes. */ + private final RPC.Server datanodeRpcServer; + private final InetSocketAddress datanodeRpcAddress; + + /** The RPC server that listens to requests from clients. */ + private final RPC.Server clientRpcServer; + private final InetSocketAddress clientRpcAddress; + + /** The RPC server that listens to requests from block service clients. */ + private final RPC.Server blockRpcServer; + private final InetSocketAddress blockRpcAddress; + + private final StorageContainerManagerHttpServer httpServer; + + /** SCM mxbean. */ + private ObjectName scmInfoBeanName; + + /** SCM super user. */ + private final String scmUsername; + private final Collection<String> scmAdminUsernames; + + /** SCM metrics. */ + private static SCMMetrics metrics; + /** Key = DatanodeUuid, value = ContainerStat. */ + private Cache<String, ContainerStat> containerReportCache; + + + private static final String USAGE = + "Usage: \n oz scm [genericOptions] " + + "[ " + StartupOption.INIT.getName() + " [ " + + StartupOption.CLUSTERID.getName() + " <cid> ] ]\n " + + "oz scm [genericOptions] [ " + + StartupOption.GENCLUSTERID.getName() + " ]\n " + + "oz scm [ " + + StartupOption.HELP.getName() + " ]\n"; + /** + * Creates a new StorageContainerManager. Configuration will be updated with + * information on the actual listening addresses used for RPC servers. + * + * @param conf configuration + */ + private StorageContainerManager(OzoneConfiguration conf) + throws IOException { + + final int handlerCount = conf.getInt( + OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); + final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + + StorageContainerManager.initMetrics(); + initContainerReportCache(conf); + + scmStorage = new SCMStorage(conf); + if (scmStorage.getState() != StorageState.INITIALIZED) { + throw new SCMException("SCM not initialized.", + ResultCodes.SCM_NOT_INITIALIZED); + } + scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this); + scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize); + scmBlockManager = new BlockManagerImpl(conf, scmNodeManager, + scmContainerManager, cacheSize); + + scmAdminUsernames = conf.getTrimmedStringCollection( + OzoneConfigKeys.OZONE_ADMINISTRATORS); + scmUsername = UserGroupInformation.getCurrentUser().getUserName(); + if (!scmAdminUsernames.contains(scmUsername)) { + scmAdminUsernames.add(scmUsername); + } + + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, + ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, + ProtobufRpcEngine.class); + + BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos. + StorageContainerDatanodeProtocolService.newReflectiveBlockingService( + new StorageContainerDatanodeProtocolServerSideTranslatorPB(this)); + + final InetSocketAddress datanodeRpcAddr = + HddsServerUtil.getScmDataNodeBindAddress(conf); + datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr, + StorageContainerDatanodeProtocolPB.class, dnProtoPbService, + handlerCount); + datanodeRpcAddress = updateRPCListenAddress(conf, + OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); + + // SCM Container Service RPC + BlockingService storageProtoPbService = + StorageContainerLocationProtocolProtos + .StorageContainerLocationProtocolService + .newReflectiveBlockingService( + new StorageContainerLocationProtocolServerSideTranslatorPB(this)); + + final InetSocketAddress scmAddress = + HddsServerUtil.getScmClientBindAddress(conf); + clientRpcServer = startRpcServer(conf, scmAddress, + StorageContainerLocationProtocolPB.class, storageProtoPbService, + handlerCount); + clientRpcAddress = updateRPCListenAddress(conf, + OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer); + + // SCM Block Service RPC + BlockingService blockProtoPbService = + ScmBlockLocationProtocolProtos + .ScmBlockLocationProtocolService + .newReflectiveBlockingService( + new ScmBlockLocationProtocolServerSideTranslatorPB(this)); + + final InetSocketAddress scmBlockAddress = + HddsServerUtil.getScmBlockClientBindAddress(conf); + blockRpcServer = startRpcServer(conf, scmBlockAddress, + ScmBlockLocationProtocolPB.class, blockProtoPbService, + handlerCount); + blockRpcAddress = updateRPCListenAddress(conf, + OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer); + + httpServer = new StorageContainerManagerHttpServer(conf); + + registerMXBean(); + } + + /** + * Initialize container reports cache that sent from datanodes. + * + * @param conf + */ + private void initContainerReportCache(OzoneConfiguration conf) { + containerReportCache = CacheBuilder.newBuilder() + .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS) + .maximumSize(Integer.MAX_VALUE) + .removalListener(new RemovalListener<String, ContainerStat>() { + @Override + public void onRemoval( + RemovalNotification<String, ContainerStat> removalNotification) { + synchronized (containerReportCache) { + ContainerStat stat = removalNotification.getValue(); + // remove invalid container report + metrics.decrContainerStat(stat); + LOG.debug( + "Remove expired container stat entry for datanode: {}.", + removalNotification.getKey()); + } + } + }).build(); + } + + /** + * Builds a message for logging startup information about an RPC server. + * + * @param description RPC server description + * @param addr RPC server listening address + * @return server startup message + */ + private static String buildRpcServerStartMessage(String description, + InetSocketAddress addr) { + return addr != null ? String.format("%s is listening at %s", + description, addr.toString()) : + String.format("%s not started", description); + } + + /** + * Starts an RPC server, if configured. + * + * @param conf configuration + * @param addr configured address of RPC server + * @param protocol RPC protocol provided by RPC server + * @param instance RPC protocol implementation instance + * @param handlerCount RPC server handler count + * + * @return RPC server + * @throws IOException if there is an I/O error while creating RPC server + */ + private static RPC.Server startRpcServer(OzoneConfiguration conf, + InetSocketAddress addr, Class<?> protocol, BlockingService instance, + int handlerCount) + throws IOException { + RPC.Server rpcServer = new RPC.Builder(conf) + .setProtocol(protocol) + .setInstance(instance) + .setBindAddress(addr.getHostString()) + .setPort(addr.getPort()) + .setNumHandlers(handlerCount) + .setVerbose(false) + .setSecretManager(null) + .build(); + + DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer); + return rpcServer; + } + + private void registerMXBean() { + Map<String, String> jmxProperties = new HashMap<>(); + jmxProperties.put("component", "ServerRuntime"); + this.scmInfoBeanName = + MBeans.register("StorageContainerManager", + "StorageContainerManagerInfo", + jmxProperties, + this); + } + + private void unregisterMXBean() { + if(this.scmInfoBeanName != null) { + MBeans.unregister(this.scmInfoBeanName); + this.scmInfoBeanName = null; + } + } + + /** + * Main entry point for starting StorageContainerManager. + * + * @param argv arguments + * @throws IOException if startup fails due to I/O error + */ + public static void main(String[] argv) throws IOException { + if (DFSUtil.parseHelpArgument(argv, USAGE, + System.out, true)) { + System.exit(0); + } + try { + OzoneConfiguration conf = new OzoneConfiguration(); + GenericOptionsParser hParser = new GenericOptionsParser(conf, argv); + if (!hParser.isParseSuccessful()) { + System.err.println("USAGE: " + USAGE + "\n"); + hParser.printGenericCommandUsage(System.err); + System.exit(1); + } + StringUtils.startupShutdownMessage(StorageContainerManager.class, + argv, LOG); + StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf); + if (scm != null) { + scm.start(); + scm.join(); + } + } catch (Throwable t) { + LOG.error("Failed to start the StorageContainerManager.", t); + terminate(1, t); + } + } + + private static void printUsage(PrintStream out) { + out.println(USAGE + "\n"); + } + + public static StorageContainerManager createSCM(String[] argv, + OzoneConfiguration conf) throws IOException { + if (!HddsUtils.isHddsEnabled(conf)) { + System.err.println("SCM cannot be started in secure mode or when " + + OZONE_ENABLED + " is set to false"); + System.exit(1); + } + StartupOption startOpt = parseArguments(argv); + if (startOpt == null) { + printUsage(System.err); + terminate(1); + return null; + } + switch (startOpt) { + case INIT: + terminate(scmInit(conf) ? 0 : 1); + return null; + case GENCLUSTERID: + System.out.println("Generating new cluster id:"); + System.out.println(StorageInfo.newClusterID()); + terminate(0); + return null; + case HELP: + printUsage(System.err); + terminate(0); + return null; + default: + return new StorageContainerManager(conf); + } + } + + /** + * Routine to set up the Version info for StorageContainerManager. + * + * @param conf OzoneConfiguration + * @return true if SCM initialization is successful, false otherwise. + * @throws IOException if init fails due to I/O error + */ + public static boolean scmInit(OzoneConfiguration conf) throws IOException { + SCMStorage scmStorage = new SCMStorage(conf); + StorageState state = scmStorage.getState(); + if (state != StorageState.INITIALIZED) { + try { + String clusterId = StartupOption.INIT.getClusterId(); + if (clusterId != null && !clusterId.isEmpty()) { + scmStorage.setClusterId(clusterId); + } + scmStorage.initialize(); + System.out.println("SCM initialization succeeded." + + "Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid=" + + scmStorage.getClusterID()); + return true; + } catch (IOException ioe) { + LOG.error("Could not initialize SCM version file", ioe); + return false; + } + } else { + System.out.println("SCM already initialized. Reusing existing" + + " cluster id for sd=" + scmStorage.getStorageDir() + ";cid=" + + scmStorage.getClusterID()); + return true; + } + } + + private static StartupOption parseArguments(String[] args) { + int argsLen = (args == null) ? 0 : args.length; + StartupOption startOpt = StartupOption.HELP; + if (argsLen == 0) { + startOpt = StartupOption.REGULAR; + } + for (int i = 0; i < argsLen; i++) { + String cmd = args[i]; + if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) { + startOpt = StartupOption.INIT; + if (argsLen > 3) { + return null; + } + for (i = i + 1; i < argsLen; i++) { + if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) { + i++; + if (i < argsLen && !args[i].isEmpty()) { + startOpt.setClusterId(args[i]); + } else { + // if no cluster id specified or is empty string, return null + LOG.error("Must specify a valid cluster ID after the " + + StartupOption.CLUSTERID.getName() + " flag"); + return null; + } + } else { + return null; + } + } + } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) { + if (argsLen > 1) { + return null; + } + startOpt = StartupOption.GENCLUSTERID; + } + } + return startOpt; + } + + /** + * Returns a SCMCommandRepose from the SCM Command. + * @param cmd - Cmd + * @return SCMCommandResponseProto + * @throws InvalidProtocolBufferException + */ + @VisibleForTesting + public SCMCommandResponseProto getCommandResponse(SCMCommand cmd, + final String datanodID) + throws IOException { + SCMCmdType type = cmd.getType(); + SCMCommandResponseProto.Builder builder = + SCMCommandResponseProto.newBuilder() + .setDatanodeUUID(datanodID); + switch (type) { + case registeredCommand: + return builder.setCmdType(SCMCmdType.registeredCommand) + .setRegisteredProto( + SCMRegisteredCmdResponseProto.getDefaultInstance()) + .build(); + case versionCommand: + return builder.setCmdType(SCMCmdType.versionCommand) + .setVersionProto(SCMVersionResponseProto.getDefaultInstance()) + .build(); + case sendContainerReport: + return builder.setCmdType(SCMCmdType.sendContainerReport) + .setSendReport(SendContainerReportProto.getDefaultInstance()) + .build(); + case reregisterCommand: + return builder.setCmdType(SCMCmdType.reregisterCommand) + .setReregisterProto(SCMReregisterCmdResponseProto + .getDefaultInstance()) + .build(); + case deleteBlocksCommand: + // Once SCM sends out the deletion message, increment the count. + // this is done here instead of when SCM receives the ACK, because + // DN might not be able to response the ACK for sometime. In case + // it times out, SCM needs to re-send the message some more times. + List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted() + .stream().map(tx -> tx.getTxID()).collect(Collectors.toList()); + this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs); + return builder.setCmdType(SCMCmdType.deleteBlocksCommand) + .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto()) + .build(); + case closeContainerCommand: + return builder.setCmdType(SCMCmdType.closeContainerCommand) + .setCloseContainerProto(((CloseContainerCommand)cmd).getProto()) + .build(); + default: + throw new IllegalArgumentException("Not implemented"); + } + } + + @VisibleForTesting + public static SCMRegisteredCmdResponseProto getRegisteredResponse( + SCMCommand cmd, SCMNodeAddressList addressList) { + Preconditions.checkState(cmd.getClass() == RegisteredCommand.class); + RegisteredCommand rCmd = (RegisteredCommand) cmd; + SCMCmdType type = cmd.getType(); + if (type != SCMCmdType.registeredCommand) { + throw new IllegalArgumentException("Registered command is not well " + + "formed. Internal Error."); + } + return SCMRegisteredCmdResponseProto.newBuilder() + //TODO : Fix this later when we have multiple SCM support. + //.setAddressList(addressList) + .setErrorCode(rCmd.getError()) + .setClusterID(rCmd.getClusterID()) + .setDatanodeUUID(rCmd.getDatanodeUUID()).build(); + } + + /** + * {@inheritDoc} + */ + @Override + public Pipeline getContainer(String containerName) throws IOException { + checkAdminAccess(); + return scmContainerManager.getContainer(containerName).getPipeline(); + } + + @VisibleForTesting + public ContainerInfo getContainerInfo(String containerName) + throws IOException { + return scmContainerManager.getContainer(containerName); + } + + /** + * {@inheritDoc} + */ + @Override + public List<ContainerInfo> listContainer(String startName, + String prefixName, int count) throws IOException { + return scmContainerManager.listContainer(startName, prefixName, count); + } + + /** + * {@inheritDoc} + */ + @Override + public void deleteContainer(String containerName) throws IOException { + checkAdminAccess(); + scmContainerManager.deleteContainer(containerName); + } + + /** + * Queries a list of Node Statuses. + * + * @param nodeStatuses + * @param queryScope + * @param poolName @return List of Datanodes. + */ + @Override + public HddsProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses, + HddsProtos.QueryScope queryScope, String poolName) throws IOException { + + if (queryScope == HddsProtos.QueryScope.POOL) { + throw new IllegalArgumentException("Not Supported yet"); + } + + List<DatanodeDetails> datanodes = queryNode(nodeStatuses); + HddsProtos.NodePool.Builder poolBuilder = + HddsProtos.NodePool.newBuilder(); + + for (DatanodeDetails datanode : datanodes) { + HddsProtos.Node node = HddsProtos.Node.newBuilder() + .setNodeID(datanode.getProtoBufMessage()) + .addAllNodeStates(nodeStatuses) + .build(); + poolBuilder.addNodes(node); + } + + return poolBuilder.build(); + } + + /** + * Notify from client when begin/finish operation for container/pipeline + * objects on datanodes. + * @param type + * @param name + * @param op + * @param stage + */ + @Override + public void notifyObjectStageChange( + ObjectStageChangeRequestProto.Type type, String name, + ObjectStageChangeRequestProto.Op op, + ObjectStageChangeRequestProto.Stage stage) throws IOException { + + LOG.info("Object type {} name {} op {} new stage {}", + type, name, op, stage); + if (type == ObjectStageChangeRequestProto.Type.container) { + if (op == ObjectStageChangeRequestProto.Op.create) { + if (stage == ObjectStageChangeRequestProto.Stage.begin) { + scmContainerManager.updateContainerState(name, + HddsProtos.LifeCycleEvent.CREATE); + } else { + scmContainerManager.updateContainerState(name, + HddsProtos.LifeCycleEvent.CREATED); + } + } else if (op == ObjectStageChangeRequestProto.Op.close) { + if (stage == ObjectStageChangeRequestProto.Stage.begin) { + scmContainerManager.updateContainerState(name, + HddsProtos.LifeCycleEvent.FINALIZE); + } else { + scmContainerManager.updateContainerState(name, + HddsProtos.LifeCycleEvent.CLOSE); + } + } + } //else if (type == ObjectStageChangeRequestProto.Type.pipeline) { + // TODO: pipeline state update will be addressed in future patch. + //} + } + + /** + * Creates a replication pipeline of a specified type. + */ + @Override + public Pipeline createReplicationPipeline( + HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor factor, + HddsProtos.NodePool nodePool) + throws IOException { + // TODO: will be addressed in future patch. + return null; + } + + /** + * Queries a list of Node that match a set of statuses. + * <p> + * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER, + * then this call will return all healthy nodes which members in + * Raft pipeline. + * <p> + * Right now we don't support operations, so we assume it is an AND operation + * between the operators. + * + * @param nodeStatuses - A set of NodeStates. + * @return List of Datanodes. + */ + + public List<DatanodeDetails> queryNode(EnumSet<NodeState> nodeStatuses) { + Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null"); + Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " + + "in the query set"); + List<DatanodeDetails> resultList = new LinkedList<>(); + Set<DatanodeDetails> currentSet = new TreeSet<>(); + + for (NodeState nodeState : nodeStatuses) { + Set<DatanodeDetails> nextSet = queryNodeState(nodeState); + if ((nextSet == null) || (nextSet.size() == 0)) { + // Right now we only support AND operation. So intersect with + // any empty set is null. + return resultList; + } + // First time we have to add all the elements, next time we have to + // do an intersection operation on the set. + if (currentSet.size() == 0) { + currentSet.addAll(nextSet); + } else { + currentSet.retainAll(nextSet); + } + } + + resultList.addAll(currentSet); + return resultList; + } + + /** + * Query the System for Nodes. + * + * @param nodeState - NodeState that we are interested in matching. + * @return Set of Datanodes that match the NodeState. + */ + private Set<DatanodeDetails> queryNodeState(NodeState nodeState) { + if (nodeState == NodeState.RAFT_MEMBER || + nodeState == NodeState.FREE_NODE) { + throw new IllegalStateException("Not implemented yet"); + } + Set<DatanodeDetails> returnSet = new TreeSet<>(); + List<DatanodeDetails> tmp = getScmNodeManager().getNodes(nodeState); + if ((tmp != null) && (tmp.size() > 0)) { + returnSet.addAll(tmp); + } + return returnSet; + } + + /** + * Asks SCM where a container should be allocated. SCM responds with the set + * of datanodes that should be used creating this container. + * + * @param containerName - Name of the container. + * @param replicationFactor - replication factor. + * @return pipeline + * @throws IOException + */ + @Override + public Pipeline allocateContainer(HddsProtos.ReplicationType replicationType, + HddsProtos.ReplicationFactor replicationFactor, String containerName, + String owner) throws IOException { + + checkAdminAccess(); + return scmContainerManager + .allocateContainer(replicationType, replicationFactor, containerName, + owner).getPipeline(); + } + + /** + * Returns listening address of StorageLocation Protocol RPC server. + * + * @return listen address of StorageLocation RPC server + */ + @VisibleForTesting + public InetSocketAddress getClientRpcAddress() { + return clientRpcAddress; + } + + @Override + public String getClientRpcPort() { + InetSocketAddress addr = getClientRpcAddress(); + return addr == null ? "0" : Integer.toString(addr.getPort()); + } + + /** + * Returns listening address of StorageDatanode Protocol RPC server. + * + * @return Address where datanode are communicating. + */ + public InetSocketAddress getDatanodeRpcAddress() { + return datanodeRpcAddress; + } + + @Override + public String getDatanodeRpcPort() { + InetSocketAddress addr = getDatanodeRpcAddress(); + return addr == null ? "0" : Integer.toString(addr.getPort()); + } + + /** + * Start service. + */ + public void start() throws IOException { + LOG.info(buildRpcServerStartMessage( + "StorageContainerLocationProtocol RPC server", clientRpcAddress)); + DefaultMetricsSystem.initialize("StorageContainerManager"); + clientRpcServer.start(); + LOG.info(buildRpcServerStartMessage( + "ScmBlockLocationProtocol RPC server", blockRpcAddress)); + blockRpcServer.start(); + LOG.info(buildRpcServerStartMessage("RPC server for DataNodes", + datanodeRpcAddress)); + datanodeRpcServer.start(); + httpServer.start(); + scmBlockManager.start(); + + setStartTime(); + + } + + /** + * Stop service. + */ + public void stop() { + try { + LOG.info("Stopping block service RPC server"); + blockRpcServer.stop(); + } catch (Exception ex) { + LOG.error("Storage Container Manager blockRpcServer stop failed.", ex); + } + + try { + LOG.info("Stopping the StorageContainerLocationProtocol RPC server"); + clientRpcServer.stop(); + } catch (Exception ex) { + LOG.error("Storage Container Manager clientRpcServer stop failed.", ex); + } + + try { + LOG.info("Stopping the RPC server for DataNodes"); + datanodeRpcServer.stop(); + } catch (Exception ex) { + LOG.error("Storage Container Manager datanodeRpcServer stop failed.", ex); + } + + try { + LOG.info("Stopping Storage Container Manager HTTP server."); + httpServer.stop(); + } catch (Exception ex) { + LOG.error("Storage Container Manager HTTP server stop failed.", ex); + } + + try { + LOG.info("Stopping Block Manager Service."); + scmBlockManager.stop(); + } catch (Exception ex) { + LOG.error("SCM block manager service stop failed.", ex); + } + + if (containerReportCache != null) { + containerReportCache.invalidateAll(); + containerReportCache.cleanUp(); + } + + if (metrics != null) { + metrics.unRegister(); + } + + unregisterMXBean(); + IOUtils.cleanupWithLogger(LOG, scmContainerManager); + IOUtils.cleanupWithLogger(LOG, scmNodeManager); + } + + /** + * Wait until service has completed shutdown. + */ + public void join() { + try { + blockRpcServer.join(); + clientRpcServer.join(); + datanodeRpcServer.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted during StorageContainerManager join."); + } + } + + /** + * Returns SCM version. + * + * @return Version info. + */ + @Override + public SCMVersionResponseProto getVersion( + SCMVersionRequestProto versionRequest) throws IOException { + return getScmNodeManager().getVersion(versionRequest).getProtobufMessage(); + } + + /** + * Used by data node to send a Heartbeat. + * + * @param datanodeDetails - Datanode Details. + * @param nodeReport - Node Report + * @param reportState - Container report ready info. + * @return - SCMHeartbeatResponseProto + * @throws IOException + */ + @Override + public SCMHeartbeatResponseProto sendHeartbeat( + DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport, + ReportState reportState) throws IOException { + List<SCMCommand> commands = + getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport, + reportState); + List<SCMCommandResponseProto> cmdResponses = new LinkedList<>(); + for (SCMCommand cmd : commands) { + cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid() + .toString())); + } + return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses) + .build(); + } + + /** + * Register Datanode. + * + * @param datanodeDetails - DatanodID. + * @param scmAddresses - List of SCMs this datanode is configured to + * communicate. + * @return SCM Command. + */ + @Override + public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto + register(DatanodeDetailsProto datanodeDetails, String[] scmAddresses) { + // TODO : Return the list of Nodes that forms the SCM HA. + return getRegisteredResponse( + scmNodeManager.register(datanodeDetails), null); + } + + /** + * Send a container report. + * + * @param reports -- Container report + * @return HeartbeatRespose.nullcommand. + * @throws IOException + */ + @Override + public ContainerReportsResponseProto sendContainerReport( + ContainerReportsRequestProto reports) throws IOException { + updateContainerReportMetrics(reports); + + // should we process container reports async? + scmContainerManager.processContainerReports(reports); + return ContainerReportsResponseProto.newBuilder().build(); + } + + private void updateContainerReportMetrics( + ContainerReportsRequestProto reports) { + ContainerStat newStat = null; + // TODO: We should update the logic once incremental container report + // type is supported. + if (reports + .getType() == ContainerReportsRequestProto.reportType.fullReport) { + newStat = new ContainerStat(); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports + .getReportsList()) { + newStat.add(new ContainerStat(info.getSize(), info.getUsed(), + info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), + info.getReadCount(), info.getWriteCount())); + } + + // update container metrics + metrics.setLastContainerStat(newStat); + } + + // Update container stat entry, this will trigger a removal operation if it + // exists in cache. + synchronized (containerReportCache) { + String datanodeUuid = reports.getDatanodeDetails().getUuid(); + if (datanodeUuid != null && newStat != null) { + containerReportCache.put(datanodeUuid, newStat); + // update global view container metrics + metrics.incrContainerStat(newStat); + } + } + } + + /** + * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved, + * SCM considers the blocks are deleted and update the metadata in SCM DB. + * + * @param acks + * @return + * @throws IOException + */ + @Override + public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK( + ContainerBlocksDeletionACKProto acks) throws IOException { + if (acks.getResultsCount() > 0) { + List<DeleteBlockTransactionResult> resultList = acks.getResultsList(); + for (DeleteBlockTransactionResult result : resultList) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got block deletion ACK from datanode, TXIDs={}, " + + "success={}", result.getTxID(), result.getSuccess()); + } + if (result.getSuccess()) { + LOG.debug("Purging TXID={} from block deletion log", + result.getTxID()); + this.getScmBlockManager().getDeletedBlockLog() + .commitTransactions(Collections.singletonList(result.getTxID())); + } else { + LOG.warn("Got failed ACK for TXID={}, prepare to resend the " + + "TX in next interval", result.getTxID()); + } + } + } + return ContainerBlocksDeletionACKResponseProto.newBuilder() + .getDefaultInstanceForType(); + } + + /** + * Returns the Number of Datanodes that are communicating with SCM. + * + * @param nodestate Healthy, Dead etc. + * @return int -- count + */ + public int getNodeCount(NodeState nodestate) { + return scmNodeManager.getNodeCount(nodestate); + } + + /** + * Returns SCM container manager. + */ + @VisibleForTesting + public Mapping getScmContainerManager() { + return scmContainerManager; + } + + /** + * Returns node manager. + * @return - Node Manager + */ + @VisibleForTesting + public NodeManager getScmNodeManager() { + return scmNodeManager; + } + + @VisibleForTesting + public BlockManager getScmBlockManager() { + return scmBlockManager; + } + + /** + * Get block locations. + * @param keys batch of block keys to retrieve. + * @return set of allocated blocks. + * @throws IOException + */ + @Override + public Set<AllocatedBlock> getBlockLocations(final Set<String> keys) + throws IOException { + Set<AllocatedBlock> locatedBlocks = new HashSet<>(); + for (String key: keys) { + Pipeline pipeline = scmBlockManager.getBlock(key); + AllocatedBlock block = new AllocatedBlock.Builder() + .setKey(key) + .setPipeline(pipeline).build(); + locatedBlocks.add(block); + } + return locatedBlocks; + } + + /** + * Asks SCM where a block should be allocated. SCM responds with the set of + * datanodes that should be used creating this block. + * + * @param size - size of the block. + * @param type - Replication type. + * @param factor + * @return allocated block accessing info (key, pipeline). + * @throws IOException + */ + @Override + public AllocatedBlock allocateBlock(long size, + HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, + String owner) throws IOException { + return scmBlockManager.allocateBlock(size, type, factor, owner); + } + + /** + * Get the clusterId and SCM Id from the version file in SCM. + */ + @Override + public ScmInfo getScmInfo() throws IOException { + ScmInfo.Builder builder = new ScmInfo.Builder() + .setClusterId(scmStorage.getClusterID()) + .setScmId(scmStorage.getScmId()); + return builder.build(); + } + /** + * Delete blocks for a set of object keys. + * + * @param keyBlocksInfoList list of block keys with object keys to delete. + * @return deletion results. + */ + public List<DeleteBlockGroupResult> deleteKeyBlocks( + List<BlockGroup> keyBlocksInfoList) throws IOException { + LOG.info("SCM is informed by KSM to delete {} blocks", + keyBlocksInfoList.size()); + List<DeleteBlockGroupResult> results = new ArrayList<>(); + for (BlockGroup keyBlocks : keyBlocksInfoList) { + Result resultCode; + try { + // We delete blocks in an atomic operation to prevent getting + // into state like only a partial of blocks are deleted, + // which will leave key in an inconsistent state. + scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList()); + resultCode = Result.success; + } catch (SCMException scmEx) { + LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx); + switch (scmEx.getResult()) { + case CHILL_MODE_EXCEPTION: + resultCode = Result.chillMode; + break; + case FAILED_TO_FIND_BLOCK: + resultCode = Result.errorNotFound; + break; + default: + resultCode = Result.unknownFailure; + } + } catch (IOException ex) { + LOG.warn("Fail to delete blocks for object key: {}", + keyBlocks.getGroupID(), ex); + resultCode = Result.unknownFailure; + } + List<DeleteBlockResult> blockResultList = new ArrayList<>(); + for (String blockKey : keyBlocks.getBlockIDList()) { + blockResultList.add(new DeleteBlockResult(blockKey, resultCode)); + } + results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(), + blockResultList)); + } + return results; + } + + @VisibleForTesting + public String getPpcRemoteUsername() { + UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser(); + return user == null ? null : user.getUserName(); + } + + private void checkAdminAccess() throws IOException { + String remoteUser = getPpcRemoteUsername(); + if(remoteUser != null) { + if (!scmAdminUsernames.contains(remoteUser)) { + throw new IOException( + "Access denied for user " + remoteUser + + ". Superuser privilege is required."); + } + } + } + + /** + * Initialize SCM metrics. + */ + public static void initMetrics() { + metrics = SCMMetrics.create(); + } + + /** + * Return SCM metrics instance. + */ + public static SCMMetrics getMetrics() { + return metrics == null ? SCMMetrics.create() : metrics; + } + + /** + * Invalidate container stat entry for given datanode. + * + * @param datanodeUuid + */ + public void removeContainerReport(String datanodeUuid) { + synchronized (containerReportCache) { + containerReportCache.invalidate(datanodeUuid); + } + } + + /** + * Get container stat of specified datanode. + * + * @param datanodeUuid + * @return + */ + public ContainerStat getContainerReport(String datanodeUuid) { + ContainerStat stat = null; + synchronized (containerReportCache) { + stat = containerReportCache.getIfPresent(datanodeUuid); + } + + return stat; + } + + /** + * Returns a view of the container stat entries. Modifications made to the + * map will directly affect the cache. + * + * @return + */ + public ConcurrentMap<String, ContainerStat> getContainerReportCache() { + return containerReportCache.asMap(); + } + + @Override + public Map<String, String> getContainerReport() { + Map<String, String> id2StatMap = new HashMap<>(); + synchronized (containerReportCache) { + ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap(); + for (Map.Entry<String, ContainerStat> entry : map.entrySet()) { + id2StatMap.put(entry.getKey(), entry.getValue().toJsonString()); + } + } + + return id2StatMap; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java new file mode 100644 index 0000000..1ca059c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.server.BaseHttpServer; +import org.apache.hadoop.ozone.OzoneConfigKeys; + +import java.io.IOException; + +/** + * HttpServer2 wrapper for the Ozone Storage Container Manager. + */ +public class StorageContainerManagerHttpServer extends BaseHttpServer { + + public StorageContainerManagerHttpServer(Configuration conf) + throws IOException { + super(conf, "scm"); + } + + @Override protected String getHttpAddressKey() { + return ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY; + } + + @Override protected String getHttpBindHostKey() { + return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_KEY; + } + + @Override protected String getHttpsAddressKey() { + return ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY; + } + + @Override protected String getHttpsBindHostKey() { + return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_HOST_KEY; + } + + @Override protected String getBindHostDefault() { + return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_DEFAULT; + } + + @Override protected int getHttpBindPortDefault() { + return ScmConfigKeys.OZONE_SCM_HTTP_BIND_PORT_DEFAULT; + } + + @Override protected int getHttpsBindPortDefault() { + return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_PORT_DEFAULT; + } + + @Override protected String getKeytabFile() { + return ScmConfigKeys.OZONE_SCM_KEYTAB_FILE; + } + + @Override protected String getSpnegoPrincipal() { + return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL; + } + + @Override protected String getEnabledKey() { + return ScmConfigKeys.OZONE_SCM_HTTP_ENABLED_KEY; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java new file mode 100644 index 0000000..4ab2516 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * + * Block APIs. + * Container is transparent to these APIs. + */ +public interface BlockManager extends Closeable { + /** + * Allocates a new block for a given size. + * @param size - Block Size + * @param type Replication Type + * @param factor - Replication Factor + * @return AllocatedBlock + * @throws IOException + */ + AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor factor, String owner) throws IOException; + + /** + * Give the key to the block, get the pipeline info. + * @param key - key to the block. + * @return - Pipeline that used to access the block. + * @throws IOException + */ + Pipeline getBlock(String key) throws IOException; + + /** + * Deletes a list of blocks in an atomic operation. Internally, SCM + * writes these blocks into a {@link DeletedBlockLog} and deletes them + * from SCM DB. If this is successful, given blocks are entering pending + * deletion state and becomes invisible from SCM namespace. + * + * @param blockIDs block IDs. This is often the list of blocks of + * a particular object key. + * @throws IOException if exception happens, non of the blocks is deleted. + */ + void deleteBlocks(List<String> blockIDs) throws IOException; + + /** + * @return the block deletion transaction log maintained by SCM. + */ + DeletedBlockLog getDeletedBlockLog(); + + /** + * Start block manager background services. + * @throws IOException + */ + void start() throws IOException; + + /** + * Shutdown block manager background services. + * @throws IOException + */ + void stop() throws IOException; + + /** + * @return the block deleting service executed in SCM. + */ + SCMBlockDeletingService getSCMBlockDeletingService(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java new file mode 100644 index 0000000..d966112 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java @@ -0,0 +1,530 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.management.ObjectName; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .CHILL_MODE_EXCEPTION; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_FIND_BLOCK; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .INVALID_BLOCK_SIZE; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; + +/** Block Manager manages the block access for SCM. */ +public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean { + private static final Logger LOG = + LoggerFactory.getLogger(BlockManagerImpl.class); + // TODO : FIX ME : Hard coding the owner. + // Currently only user of the block service is Ozone, CBlock manages blocks + // by itself and does not rely on the Block service offered by SCM. + + private final NodeManager nodeManager; + private final Mapping containerManager; + private final MetadataStore blockStore; + + private final Lock lock; + private final long containerSize; + private final long cacheSize; + + private final DeletedBlockLog deletedBlockLog; + private final SCMBlockDeletingService blockDeletingService; + + private final int containerProvisionBatchSize; + private final Random rand; + private ObjectName mxBean; + + /** + * Constructor. + * + * @param conf - configuration. + * @param nodeManager - node manager. + * @param containerManager - container manager. + * @param cacheSizeMB - cache size for level db store. + * @throws IOException + */ + public BlockManagerImpl(final Configuration conf, + final NodeManager nodeManager, final Mapping containerManager, + final int cacheSizeMB) throws IOException { + this.nodeManager = nodeManager; + this.containerManager = containerManager; + this.cacheSize = cacheSizeMB; + + this.containerSize = OzoneConsts.GB * conf.getInt( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); + File metaDir = getOzoneMetaDirPath(conf); + String scmMetaDataDir = metaDir.getPath(); + + // Write the block key to container name mapping. + File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB); + blockStore = + MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(blockContainerDbPath) + .setCacheSize(this.cacheSize * OzoneConsts.MB) + .build(); + + this.containerProvisionBatchSize = + conf.getInt( + ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT); + rand = new Random(); + this.lock = new ReentrantLock(); + + mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this); + + // SCM block deleting transaction log and deleting service. + deletedBlockLog = new DeletedBlockLogImpl(conf); + long svcInterval = + conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = + conf.getTimeDuration( + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + blockDeletingService = + new SCMBlockDeletingService( + deletedBlockLog, containerManager, nodeManager, svcInterval, + serviceTimeout, conf); + } + + /** + * Start block manager services. + * + * @throws IOException + */ + public void start() throws IOException { + this.blockDeletingService.start(); + } + + /** + * Shutdown block manager services. + * + * @throws IOException + */ + public void stop() throws IOException { + this.blockDeletingService.shutdown(); + this.close(); + } + + /** + * Pre allocate specified count of containers for block creation. + * + * @param count - Number of containers to allocate. + * @param type - Type of containers + * @param factor - how many copies needed for this container. + * @throws IOException + */ + private void preAllocateContainers(int count, ReplicationType type, + ReplicationFactor factor, String owner) + throws IOException { + lock.lock(); + try { + for (int i = 0; i < count; i++) { + String containerName = UUID.randomUUID().toString(); + ContainerInfo containerInfo = null; + try { + // TODO: Fix this later when Ratis is made the Default. + containerInfo = containerManager.allocateContainer(type, factor, + containerName, owner); + + if (containerInfo == null) { + LOG.warn("Unable to allocate container."); + continue; + } + } catch (IOException ex) { + LOG.warn("Unable to allocate container: {}", ex); + continue; + } + } + } finally { + lock.unlock(); + } + } + + /** + * Allocates a block in a container and returns that info. + * + * @param size - Block Size + * @param type Replication Type + * @param factor - Replication Factor + * @return Allocated block + * @throws IOException on failure. + */ + @Override + public AllocatedBlock allocateBlock(final long size, + ReplicationType type, ReplicationFactor factor, String owner) + throws IOException { + LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor); + + if (size < 0 || size > containerSize) { + LOG.warn("Invalid block size requested : {}", size); + throw new SCMException("Unsupported block size: " + size, + INVALID_BLOCK_SIZE); + } + + if (!nodeManager.isOutOfChillMode()) { + LOG.warn("Not out of Chill mode."); + throw new SCMException("Unable to create block while in chill mode", + CHILL_MODE_EXCEPTION); + } + + lock.lock(); + try { + /* + Here is the high level logic. + + 1. First we check if there are containers in ALLOCATED state, + that is + SCM has allocated them in the SCM namespace but the + corresponding + container has not been created in the Datanode yet. If we + have any + in that state, we will return that to the client, which allows + client to finish creating those containers. This is a sort of + greedy + algorithm, our primary purpose is to get as many containers as + possible. + + 2. If there are no allocated containers -- Then we find a Open + container that matches that pattern. + + 3. If both of them fail, the we will pre-allocate a bunch of + conatainers in SCM and try again. + + TODO : Support random picking of two containers from the list. + So we + can use different kind of policies. + */ + + ContainerInfo containerInfo; + + // Look for ALLOCATED container that matches all other parameters. + containerInfo = + containerManager + .getStateManager() + .getMatchingContainer( + size, owner, type, factor, HddsProtos.LifeCycleState + .ALLOCATED); + if (containerInfo != null) { + containerManager.updateContainerState(containerInfo.getContainerName(), + HddsProtos.LifeCycleEvent.CREATE); + return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED); + } + + // Since we found no allocated containers that match our criteria, let us + // look for OPEN containers that match the criteria. + containerInfo = + containerManager + .getStateManager() + .getMatchingContainer(size, owner, type, factor, HddsProtos + .LifeCycleState.OPEN); + if (containerInfo != null) { + return newBlock(containerInfo, HddsProtos.LifeCycleState.OPEN); + } + + // We found neither ALLOCATED or OPEN Containers. This generally means + // that most of our containers are full or we have not allocated + // containers of the type and replication factor. So let us go and + // allocate some. + preAllocateContainers(containerProvisionBatchSize, type, factor, owner); + + // Since we just allocated a set of containers this should work + containerInfo = + containerManager + .getStateManager() + .getMatchingContainer( + size, owner, type, factor, HddsProtos.LifeCycleState + .ALLOCATED); + if (containerInfo != null) { + containerManager.updateContainerState(containerInfo.getContainerName(), + HddsProtos.LifeCycleEvent.CREATE); + return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED); + } + + // we have tried all strategies we know and but somehow we are not able + // to get a container for this block. Log that info and return a null. + LOG.error( + "Unable to allocate a block for the size: {}, type: {}, " + + "factor: {}", + size, + type, + factor); + return null; + } finally { + lock.unlock(); + } + } + + /** + * newBlock - returns a new block assigned to a container. + * + * @param containerInfo - Container Info. + * @param state - Current state of the container. + * @return AllocatedBlock + */ + private AllocatedBlock newBlock( + ContainerInfo containerInfo, HddsProtos.LifeCycleState state) + throws IOException { + + // TODO : Replace this with Block ID. + String blockKey = UUID.randomUUID().toString(); + boolean createContainer = (state == HddsProtos.LifeCycleState.ALLOCATED); + + AllocatedBlock.Builder abb = + new AllocatedBlock.Builder() + .setKey(blockKey) + // TODO : Use containerinfo instead of pipeline. + .setPipeline(containerInfo.getPipeline()) + .setShouldCreateContainer(createContainer); + LOG.trace("New block allocated : {} Container ID: {}", blockKey, + containerInfo.toString()); + + if (containerInfo.getPipeline().getMachines().size() == 0) { + LOG.error("Pipeline Machine count is zero."); + return null; + } + + // Persist this block info to the blockStore DB, so getBlock(key) can + // find which container the block lives. + // TODO : Remove this DB in future + // and make this a KSM operation. Category: SCALABILITY. + if (containerInfo.getPipeline().getMachines().size() > 0) { + blockStore.put( + DFSUtil.string2Bytes(blockKey), + DFSUtil.string2Bytes(containerInfo.getPipeline().getContainerName())); + } + return abb.build(); + } + + /** + * Given a block key, return the Pipeline information. + * + * @param key - block key assigned by SCM. + * @return Pipeline (list of DNs and leader) to access the block. + * @throws IOException + */ + @Override + public Pipeline getBlock(final String key) throws IOException { + lock.lock(); + try { + byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key)); + if (containerBytes == null) { + throw new SCMException( + "Specified block key does not exist. key : " + key, + FAILED_TO_FIND_BLOCK); + } + + String containerName = DFSUtil.bytes2String(containerBytes); + ContainerInfo containerInfo = containerManager.getContainer( + containerName); + if (containerInfo == null) { + LOG.debug("Container {} allocated by block service" + + "can't be found in SCM", containerName); + throw new SCMException( + "Unable to find container for the block", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + return containerInfo.getPipeline(); + } finally { + lock.unlock(); + } + } + + /** + * Deletes a list of blocks in an atomic operation. Internally, SCM writes + * these blocks into a + * {@link DeletedBlockLog} and deletes them from SCM DB. If this is + * successful, given blocks are + * entering pending deletion state and becomes invisible from SCM namespace. + * + * @param blockIDs block IDs. This is often the list of blocks of a + * particular object key. + * @throws IOException if exception happens, non of the blocks is deleted. + */ + @Override + public void deleteBlocks(List<String> blockIDs) throws IOException { + if (!nodeManager.isOutOfChillMode()) { + throw new SCMException("Unable to delete block while in chill mode", + CHILL_MODE_EXCEPTION); + } + + lock.lock(); + LOG.info("Deleting blocks {}", String.join(",", blockIDs)); + Map<String, List<String>> containerBlocks = new HashMap<>(); + BatchOperation batch = new BatchOperation(); + BatchOperation rollbackBatch = new BatchOperation(); + // TODO: track the block size info so that we can reclaim the container + // TODO: used space when the block is deleted. + try { + for (String blockKey : blockIDs) { + byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey); + byte[] containerBytes = blockStore.get(blockKeyBytes); + if (containerBytes == null) { + throw new SCMException( + "Specified block key does not exist. key : " + blockKey, + FAILED_TO_FIND_BLOCK); + } + batch.delete(blockKeyBytes); + rollbackBatch.put(blockKeyBytes, containerBytes); + + // Merge blocks to a container to blocks mapping, + // prepare to persist this info to the deletedBlocksLog. + String containerName = DFSUtil.bytes2String(containerBytes); + if (containerBlocks.containsKey(containerName)) { + containerBlocks.get(containerName).add(blockKey); + } else { + List<String> item = new ArrayList<>(); + item.add(blockKey); + containerBlocks.put(containerName, item); + } + } + + // We update SCM DB first, so if this step fails, we end up here, + // nothing gets into the delLog so no blocks will be accidentally + // removed. If we write the log first, once log is written, the + // async deleting service will start to scan and might be picking + // up some blocks to do real deletions, that might cause data loss. + blockStore.writeBatch(batch); + try { + deletedBlockLog.addTransactions(containerBlocks); + } catch (IOException e) { + try { + // If delLog update is failed, we need to rollback the changes. + blockStore.writeBatch(rollbackBatch); + } catch (IOException rollbackException) { + // This is a corner case. AddTX fails and rollback also fails, + // this will leave these blocks in inconsistent state. They were + // moved to pending deletion state in SCM DB but were not written + // into delLog so real deletions would not be done. Blocks become + // to be invisible from namespace but actual data are not removed. + // We log an error here so admin can manually check and fix such + // errors. + LOG.error( + "Blocks might be in inconsistent state because" + + " they were moved to pending deletion state in SCM DB but" + + " not written into delLog. Admin can manually add them" + + " into delLog for deletions. Inconsistent block list: {}", + String.join(",", blockIDs), + e); + throw rollbackException; + } + throw new IOException( + "Skip writing the deleted blocks info to" + + " the delLog because addTransaction fails. Batch skipped: " + + String.join(",", blockIDs), + e); + } + // TODO: Container report handling of the deleted blocks: + // Remove tombstone and update open container usage. + // We will revisit this when the closed container replication is done. + } finally { + lock.unlock(); + } + } + + @Override + public DeletedBlockLog getDeletedBlockLog() { + return this.deletedBlockLog; + } + + @VisibleForTesting + public String getDeletedKeyName(String key) { + return StringUtils.format(".Deleted/%s", key); + } + + /** + * Close the resources for BlockManager. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (blockStore != null) { + blockStore.close(); + } + if (deletedBlockLog != null) { + deletedBlockLog.close(); + } + blockDeletingService.shutdown(); + if (mxBean != null) { + MBeans.unregister(mxBean); + mxBean = null; + } + } + + @Override + public int getOpenContainersNo() { + return 0; + // TODO : FIX ME : The open container being a single number does not make + // sense. + // We have to get open containers by Replication Type and Replication + // factor. Hence returning 0 for now. + // containers.get(HddsProtos.LifeCycleState.OPEN).size(); + } + + @Override + public SCMBlockDeletingService getSCMBlockDeletingService() { + return this.blockDeletingService; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java new file mode 100644 index 0000000..23c6983 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.block; + + +/** + * JMX interface for the block manager. + */ +public interface BlockmanagerMXBean { + + /** + * Number of open containers manager by the block manager. + */ + int getOpenContainersNo(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java new file mode 100644 index 0000000..47074d2 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.collect.ArrayListMultimap; +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * A wrapper class to hold info about datanode and all deleted block + * transactions that will be sent to this datanode. + */ +public class DatanodeDeletedBlockTransactions { + private int nodeNum; + // The throttle size for each datanode. + private int maximumAllowedTXNum; + // Current counter of inserted TX. + private int currentTXNum; + private Mapping mappingService; + // A list of TXs mapped to a certain datanode ID. + private final ArrayListMultimap<UUID, DeletedBlocksTransaction> + transactions; + + DatanodeDeletedBlockTransactions(Mapping mappingService, + int maximumAllowedTXNum, int nodeNum) { + this.transactions = ArrayListMultimap.create(); + this.mappingService = mappingService; + this.maximumAllowedTXNum = maximumAllowedTXNum; + this.nodeNum = nodeNum; + } + + public void addTransaction(DeletedBlocksTransaction tx) throws IOException { + ContainerInfo info = null; + try { + info = mappingService.getContainer(tx.getContainerName()); + } catch (IOException e) { + SCMBlockDeletingService.LOG.warn("Got container info error.", e); + } + + if (info == null) { + SCMBlockDeletingService.LOG.warn( + "Container {} not found, continue to process next", + tx.getContainerName()); + return; + } + + for (DatanodeDetails dd : info.getPipeline().getMachines()) { + UUID dnID = dd.getUuid(); + if (transactions.containsKey(dnID)) { + List<DeletedBlocksTransaction> txs = transactions.get(dnID); + if (txs != null && txs.size() < maximumAllowedTXNum) { + boolean hasContained = false; + for (DeletedBlocksTransaction t : txs) { + if (t.getContainerName().equals(tx.getContainerName())) { + hasContained = true; + break; + } + } + + if (!hasContained) { + txs.add(tx); + currentTXNum++; + } + } + } else { + currentTXNum++; + transactions.put(dnID, tx); + } + SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID, + tx.getTxID()); + } + } + + Set<UUID> getDatanodeIDs() { + return transactions.keySet(); + } + + boolean isEmpty() { + return transactions.isEmpty(); + } + + boolean hasTransactions(UUID dnId) { + return transactions.containsKey(dnId) && + !transactions.get(dnId).isEmpty(); + } + + List<DeletedBlocksTransaction> getDatanodeTransactions(UUID dnId) { + return transactions.get(dnId); + } + + List<String> getTransactionIDList(UUID dnId) { + if (hasTransactions(dnId)) { + return transactions.get(dnId).stream() + .map(DeletedBlocksTransaction::getTxID).map(String::valueOf) + .collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + boolean isFull() { + return currentTXNum >= maximumAllowedTXNum * nodeNum; + } + + int getTXNum() { + return currentTXNum; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java new file mode 100644 index 0000000..f7b770e --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * The DeletedBlockLog is a persisted log in SCM to keep tracking + * container blocks which are under deletion. It maintains info + * about under-deletion container blocks that notified by KSM, + * and the state how it is processed. + */ +public interface DeletedBlockLog extends Closeable { + + /** + * A limit size list of transactions. Note count is the max number + * of TXs to return, we might not be able to always return this + * number. and the processCount of those transactions + * should be [0, MAX_RETRY). + * + * @param count - number of transactions. + * @return a list of BlockDeletionTransaction. + */ + List<DeletedBlocksTransaction> getTransactions(int count) + throws IOException; + + /** + * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions. + * Once DatanodeDeletedBlockTransactions is full, the scan behavior will + * stop. + * @param transactions a list of TXs will be set into. + * @throws IOException + */ + void getTransactions(DatanodeDeletedBlockTransactions transactions) + throws IOException; + + /** + * Return all failed transactions in the log. A transaction is considered + * to be failed if it has been sent more than MAX_RETRY limit and its + * count is reset to -1. + * + * @return a list of failed deleted block transactions. + * @throws IOException + */ + List<DeletedBlocksTransaction> getFailedTransactions() + throws IOException; + + /** + * Increments count for given list of transactions by 1. + * The log maintains a valid range of counts for each transaction + * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate + * the transaction is no longer valid. + * + * @param txIDs - transaction ID. + */ + void incrementCount(List<Long> txIDs) + throws IOException; + + /** + * Commits a transaction means to delete all footprints of a transaction + * from the log. This method doesn't guarantee all transactions can be + * successfully deleted, it tolerate failures and tries best efforts to. + * + * @param txIDs - transaction IDs. + */ + void commitTransactions(List<Long> txIDs) throws IOException; + + /** + * Creates a block deletion transaction and adds that into the log. + * + * @param containerName - container name. + * @param blocks - blocks that belong to the same container. + * + * @throws IOException + */ + void addTransaction(String containerName, List<String> blocks) + throws IOException; + + /** + * Creates block deletion transactions for a set of containers, + * add into the log and persist them atomically. An object key + * might be stored in multiple containers and multiple blocks, + * this API ensures that these updates are done in atomic manner + * so if any of them fails, the entire operation fails without + * any updates to the log. Note, this doesn't mean to create only + * one transaction, it creates multiple transactions (depends on the + * number of containers) together (on success) or non (on failure). + * + * @param containerBlocksMap a map of containerBlocks. + * @throws IOException + */ + void addTransactions(Map<String, List<String>> containerBlocksMap) + throws IOException; + + /** + * Returns the total number of valid transactions. A transaction is + * considered to be valid as long as its count is in range [0, MAX_RETRY]. + * + * @return number of a valid transactions. + * @throws IOException + */ + int getNumOfValidTransactions() throws IOException; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org