http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java new file mode 100644 index 0000000..0f4988a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.utils.BatchOperation; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB; + +/** + * A implement class of {@link DeletedBlockLog}, and it uses + * K/V db to maintain block deletion transactions between scm and datanode. + * This is a very basic implementation, it simply scans the log and + * memorize the position that scanned by last time, and uses this to + * determine where the next scan starts. It has no notion about weight + * of each transaction so as long as transaction is still valid, they get + * equally same chance to be retrieved which only depends on the nature + * order of the transaction ID. + */ +public class DeletedBlockLogImpl implements DeletedBlockLog { + + private static final Logger LOG = + LoggerFactory.getLogger(DeletedBlockLogImpl.class); + + private static final byte[] LATEST_TXID = + DFSUtil.string2Bytes("#LATEST_TXID#"); + + private final int maxRetry; + private final MetadataStore deletedStore; + private final Lock lock; + // The latest id of deleted blocks in the db. + private long lastTxID; + private long lastReadTxID; + + public DeletedBlockLogImpl(Configuration conf) throws IOException { + maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, + OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); + + File metaDir = getOzoneMetaDirPath(conf); + String scmMetaDataDir = metaDir.getPath(); + File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB); + int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + // Load store of all transactions. + deletedStore = MetadataStoreBuilder.newBuilder() + .setCreateIfMissing(true) + .setConf(conf) + .setDbFile(deletedLogDbPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + + this.lock = new ReentrantLock(); + // start from the head of deleted store. + lastReadTxID = 0; + lastTxID = findLatestTxIDInStore(); + } + + @VisibleForTesting + MetadataStore getDeletedStore() { + return deletedStore; + } + + /** + * There is no need to lock before reading because + * it's only used in construct method. + * + * @return latest txid. + * @throws IOException + */ + private long findLatestTxIDInStore() throws IOException { + long txid = 0; + byte[] value = deletedStore.get(LATEST_TXID); + if (value != null) { + txid = Longs.fromByteArray(value); + } + return txid; + } + + @Override + public List<DeletedBlocksTransaction> getTransactions( + int count) throws IOException { + List<DeletedBlocksTransaction> result = new ArrayList<>(); + MetadataKeyFilter getNextTxID = (preKey, currentKey, nextKey) + -> Longs.fromByteArray(currentKey) > lastReadTxID; + MetadataKeyFilter avoidInvalidTxid = (preKey, currentKey, nextKey) + -> !Arrays.equals(LATEST_TXID, currentKey); + lock.lock(); + try { + deletedStore.iterate(null, (key, value) -> { + if (getNextTxID.filterKey(null, key, null) && + avoidInvalidTxid.filterKey(null, key, null)) { + DeletedBlocksTransaction block = DeletedBlocksTransaction + .parseFrom(value); + if (block.getCount() > -1 && block.getCount() <= maxRetry) { + result.add(block); + } + } + return result.size() < count; + }); + // Scan the metadata from the beginning. + if (result.size() < count || result.size() < 1) { + lastReadTxID = 0; + } else { + lastReadTxID = result.get(result.size() - 1).getTxID(); + } + } finally { + lock.unlock(); + } + return result; + } + + @Override + public List<DeletedBlocksTransaction> getFailedTransactions() + throws IOException { + lock.lock(); + try { + final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList(); + deletedStore.iterate(null, (key, value) -> { + if (!Arrays.equals(LATEST_TXID, key)) { + DeletedBlocksTransaction delTX = + DeletedBlocksTransaction.parseFrom(value); + if (delTX.getCount() == -1) { + failedTXs.add(delTX); + } + } + return true; + }); + return failedTXs; + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + * + * @param txIDs - transaction ID. + * @throws IOException + */ + @Override + public void incrementCount(List<Long> txIDs) throws IOException { + BatchOperation batch = new BatchOperation(); + lock.lock(); + try { + for(Long txID : txIDs) { + try { + DeletedBlocksTransaction block = DeletedBlocksTransaction + .parseFrom(deletedStore.get(Longs.toByteArray(txID))); + DeletedBlocksTransaction.Builder builder = block.toBuilder(); + int currentCount = block.getCount(); + if (currentCount > -1) { + builder.setCount(++currentCount); + } + // if the retry time exceeds the maxRetry value + // then set the retry value to -1, stop retrying, admins can + // analyze those blocks and purge them manually by SCMCli. + if (currentCount > maxRetry) { + builder.setCount(-1); + } + deletedStore.put(Longs.toByteArray(txID), + builder.build().toByteArray()); + } catch (IOException ex) { + LOG.warn("Cannot increase count for txID " + txID, ex); + } + } + deletedStore.writeBatch(batch); + } finally { + lock.unlock(); + } + } + + private DeletedBlocksTransaction constructNewTransaction(long txID, + String containerName, List<String> blocks) { + return DeletedBlocksTransaction.newBuilder() + .setTxID(txID) + .setContainerName(containerName) + .addAllBlockID(blocks) + .setCount(0) + .build(); + } + + /** + * {@inheritDoc} + * + * @param txIDs - transaction IDs. + * @throws IOException + */ + @Override + public void commitTransactions(List<Long> txIDs) throws IOException { + lock.lock(); + try { + for (Long txID : txIDs) { + try { + deletedStore.delete(Longs.toByteArray(txID)); + } catch (IOException ex) { + LOG.warn("Cannot commit txID " + txID, ex); + } + } + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + * + * @param containerName - container name. + * @param blocks - blocks that belong to the same container. + * @throws IOException + */ + @Override + public void addTransaction(String containerName, List<String> blocks) + throws IOException { + BatchOperation batch = new BatchOperation(); + lock.lock(); + try { + DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1, + containerName, blocks); + byte[] key = Longs.toByteArray(lastTxID + 1); + + batch.put(key, tx.toByteArray()); + batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1)); + + deletedStore.writeBatch(batch); + lastTxID += 1; + } finally { + lock.unlock(); + } + } + + @Override + public int getNumOfValidTransactions() throws IOException { + lock.lock(); + try { + final AtomicInteger num = new AtomicInteger(0); + deletedStore.iterate(null, (key, value) -> { + // Exclude latest txid record + if (!Arrays.equals(LATEST_TXID, key)) { + DeletedBlocksTransaction delTX = + DeletedBlocksTransaction.parseFrom(value); + if (delTX.getCount() > -1) { + num.incrementAndGet(); + } + } + return true; + }); + return num.get(); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + * + * @param containerBlocksMap a map of containerBlocks. + * @throws IOException + */ + @Override + public void addTransactions(Map<String, List<String>> containerBlocksMap) + throws IOException { + BatchOperation batch = new BatchOperation(); + lock.lock(); + try { + long currentLatestID = lastTxID; + for (Map.Entry<String, List<String>> entry : + containerBlocksMap.entrySet()) { + currentLatestID += 1; + byte[] key = Longs.toByteArray(currentLatestID); + DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID, + entry.getKey(), entry.getValue()); + batch.put(key, tx.toByteArray()); + } + lastTxID = currentLatestID; + batch.put(LATEST_TXID, Longs.toByteArray(lastTxID)); + deletedStore.writeBatch(batch); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + if (deletedStore != null) { + deletedStore.close(); + } + } + + @Override + public void getTransactions(DatanodeDeletedBlockTransactions transactions) + throws IOException { + lock.lock(); + try { + deletedStore.iterate(null, (key, value) -> { + if (!Arrays.equals(LATEST_TXID, key)) { + DeletedBlocksTransaction block = DeletedBlocksTransaction + .parseFrom(value); + + if (block.getCount() > -1 && block.getCount() <= maxRetry) { + transactions.addTransaction(block); + } + return !transactions.isFull(); + } + return true; + }); + } finally { + lock.unlock(); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java new file mode 100644 index 0000000..2c555e0 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.BackgroundService; +import org.apache.hadoop.utils.BackgroundTask; +import org.apache.hadoop.utils.BackgroundTaskQueue; +import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; + +/** + * A background service running in SCM to delete blocks. This service scans + * block deletion log in certain interval and caches block deletion commands + * in {@link org.apache.hadoop.hdds.scm.node.CommandQueue}, asynchronously + * SCM HB thread polls cached commands and sends them to datanode for physical + * processing. + */ +public class SCMBlockDeletingService extends BackgroundService { + + static final Logger LOG = + LoggerFactory.getLogger(SCMBlockDeletingService.class); + + // ThreadPoolSize=2, 1 for scheduler and the other for the scanner. + private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2; + private final DeletedBlockLog deletedBlockLog; + private final Mapping mappingService; + private final NodeManager nodeManager; + + // Block delete limit size is dynamically calculated based on container + // delete limit size (ozone.block.deleting.container.limit.per.interval) + // that configured for datanode. To ensure DN not wait for + // delete commands, we use this value multiply by a factor 2 as the final + // limit TX size for each node. + // Currently we implement a throttle algorithm that throttling delete blocks + // for each datanode. Each node is limited by the calculation size. Firstly + // current node info is fetched from nodemanager, then scan entire delLog + // from the beginning to end. If one node reaches maximum value, its records + // will be skipped. If not, keep scanning until it reaches maximum value. + // Once all node are full, the scan behavior will stop. + private int blockDeleteLimitSize; + + public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, + Mapping mapper, NodeManager nodeManager, + long interval, long serviceTimeout, Configuration conf) { + super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS, + BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); + this.deletedBlockLog = deletedBlockLog; + this.mappingService = mapper; + this.nodeManager = nodeManager; + + int containerLimit = conf.getInt( + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); + Preconditions.checkArgument(containerLimit > 0, + "Container limit size should be " + "positive."); + // Use container limit value multiply by a factor 2 to ensure DN + // not wait for orders. + this.blockDeleteLimitSize = containerLimit * 2; + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new DeletedBlockTransactionScanner()); + return queue; + } + + private class DeletedBlockTransactionScanner + implements BackgroundTask<EmptyTaskResult> { + + @Override + public int getPriority() { + return 1; + } + + @Override + public EmptyTaskResult call() throws Exception { + int dnTxCount = 0; + long startTime = Time.monotonicNow(); + // Scan SCM DB in HB interval and collect a throttled list of + // to delete blocks. + LOG.debug("Running DeletedBlockTransactionScanner"); + DatanodeDeletedBlockTransactions transactions = null; + List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); + if (datanodes != null) { + transactions = new DatanodeDeletedBlockTransactions(mappingService, + blockDeleteLimitSize, datanodes.size()); + try { + deletedBlockLog.getTransactions(transactions); + } catch (IOException e) { + // We may tolerant a number of failures for sometime + // but if it continues to fail, at some point we need to raise + // an exception and probably fail the SCM ? At present, it simply + // continues to retry the scanning. + LOG.error("Failed to get block deletion transactions from delTX log", + e); + } + LOG.debug("Scanned deleted blocks log and got {} delTX to process.", + transactions.getTXNum()); + } + + if (transactions != null && !transactions.isEmpty()) { + for (UUID dnId : transactions.getDatanodeIDs()) { + List<DeletedBlocksTransaction> dnTXs = transactions + .getDatanodeTransactions(dnId); + if (dnTXs != null && !dnTXs.isEmpty()) { + dnTxCount += dnTXs.size(); + // TODO commandQueue needs a cap. + // We should stop caching new commands if num of un-processed + // command is bigger than a limit, e.g 50. In case datanode goes + // offline for sometime, the cached commands be flooded. + nodeManager.addDatanodeCommand(dnId, + new DeleteBlocksCommand(dnTXs)); + LOG.debug( + "Added delete block command for datanode {} in the queue," + + " number of delete block transactions: {}, TxID list: {}", + dnId, dnTXs.size(), String.join(",", + transactions.getTransactionIDList(dnId))); + } + } + } + + if (dnTxCount > 0) { + LOG.info( + "Totally added {} delete blocks command for" + + " {} datanodes, task elapsed time: {}ms", + dnTxCount, transactions.getDatanodeIDs().size(), + Time.monotonicNow() - startTime); + } + + return EmptyTaskResult.newResult(); + } + } + + @VisibleForTesting + public void setBlockDeleteTXNum(int numTXs) { + blockDeleteLimitSize = numTXs; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java new file mode 100644 index 0000000..e1bfdff --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.block; +/** + * This package contains routines to manage the block location and + * mapping inside SCM + */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java new file mode 100644 index 0000000..63cb3a3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * <p>http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * <p>Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.lease.Lease; +import org.apache.hadoop.ozone.lease.LeaseException; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter; +import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_SIZE_GB; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; + +/** + * Mapping class contains the mapping from a name to a pipeline mapping. This + * is used by SCM when + * allocating new locations and when looking up a key. + */ +public class ContainerMapping implements Mapping { + private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping + .class); + + private final NodeManager nodeManager; + private final long cacheSize; + private final Lock lock; + private final Charset encoding = Charset.forName("UTF-8"); + private final MetadataStore containerStore; + private final PipelineSelector pipelineSelector; + private final ContainerStateManager containerStateManager; + private final LeaseManager<ContainerInfo> containerLeaseManager; + private final ContainerSupervisor containerSupervisor; + private final float containerCloseThreshold; + private final ContainerCloser closer; + private final long size; + + /** + * Constructs a mapping class that creates mapping between container names + * and pipelines. + * + * @param nodeManager - NodeManager so that we can get the nodes that are + * healthy to place new + * containers. + * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache + * its nodes. This is + * passed to LevelDB and this memory is allocated in Native code space. + * CacheSize is specified + * in MB. + * @throws IOException on Failure. + */ + @SuppressWarnings("unchecked") + public ContainerMapping( + final Configuration conf, final NodeManager nodeManager, final int + cacheSizeMB) throws IOException { + this.nodeManager = nodeManager; + this.cacheSize = cacheSizeMB; + this.closer = new ContainerCloser(nodeManager, conf); + + File metaDir = getOzoneMetaDirPath(conf); + + // Write the container name to pipeline mapping. + File containerDBPath = new File(metaDir, SCM_CONTAINER_DB); + containerStore = + MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(containerDBPath) + .setCacheSize(this.cacheSize * OzoneConsts.MB) + .build(); + + this.lock = new ReentrantLock(); + + this.pipelineSelector = new PipelineSelector(nodeManager, conf); + + // To be replaced with code getStorageSize once it is committed. + size = conf.getLong(OZONE_SCM_CONTAINER_SIZE_GB, + OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; + this.containerStateManager = + new ContainerStateManager(conf, this); + this.containerSupervisor = + new ContainerSupervisor(conf, nodeManager, + nodeManager.getNodePoolManager()); + this.containerCloseThreshold = conf.getFloat( + ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, + ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); + LOG.trace("Container State Manager created."); + + long containerCreationLeaseTimeout = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, + ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + LOG.trace("Starting Container Lease Manager."); + containerLeaseManager = new LeaseManager<>(containerCreationLeaseTimeout); + containerLeaseManager.start(); + } + + /** + * {@inheritDoc} + */ + @Override + public ContainerInfo getContainer(final String containerName) throws + IOException { + ContainerInfo containerInfo; + lock.lock(); + try { + byte[] containerBytes = containerStore.get(containerName.getBytes( + encoding)); + if (containerBytes == null) { + throw new SCMException( + "Specified key does not exist. key : " + containerName, + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + + HddsProtos.SCMContainerInfo temp = HddsProtos.SCMContainerInfo.PARSER + .parseFrom(containerBytes); + containerInfo = ContainerInfo.fromProtobuf(temp); + return containerInfo; + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<ContainerInfo> listContainer(String startName, + String prefixName, int count) throws IOException { + List<ContainerInfo> containerList = new ArrayList<>(); + lock.lock(); + try { + if (containerStore.isEmpty()) { + throw new IOException("No container exists in current db"); + } + MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName); + byte[] startKey = startName == null ? null : DFSUtil.string2Bytes( + startName); + List<Map.Entry<byte[], byte[]>> range = + containerStore.getSequentialRangeKVs(startKey, count, prefixFilter); + + // Transform the values into the pipelines. + // TODO: filter by container state + for (Map.Entry<byte[], byte[]> entry : range) { + ContainerInfo containerInfo = + ContainerInfo.fromProtobuf( + HddsProtos.SCMContainerInfo.PARSER.parseFrom( + entry.getValue())); + Preconditions.checkNotNull(containerInfo); + containerList.add(containerInfo); + } + } finally { + lock.unlock(); + } + return containerList; + } + + /** + * Allocates a new container. + * + * @param replicationFactor - replication factor of the container. + * @param containerName - Name of the container. + * @param owner - The string name of the Service that owns this container. + * @return - Pipeline that makes up this container. + * @throws IOException - Exception + */ + @Override + public ContainerInfo allocateContainer( + ReplicationType type, + ReplicationFactor replicationFactor, + final String containerName, + String owner) + throws IOException { + Preconditions.checkNotNull(containerName); + Preconditions.checkState(!containerName.isEmpty()); + + ContainerInfo containerInfo; + if (!nodeManager.isOutOfChillMode()) { + throw new SCMException( + "Unable to create container while in chill mode", + SCMException.ResultCodes.CHILL_MODE_EXCEPTION); + } + + lock.lock(); + try { + byte[] containerBytes = containerStore.get(containerName.getBytes( + encoding)); + if (containerBytes != null) { + throw new SCMException( + "Specified container already exists. key : " + containerName, + SCMException.ResultCodes.CONTAINER_EXISTS); + } + containerInfo = + containerStateManager.allocateContainer( + pipelineSelector, type, replicationFactor, containerName, + owner); + containerStore.put( + containerName.getBytes(encoding), containerInfo.getProtobuf() + .toByteArray()); + } finally { + lock.unlock(); + } + return containerInfo; + } + + /** + * Deletes a container from SCM. + * + * @param containerName - Container name + * @throws IOException if container doesn't exist or container store failed + * to delete the + * specified key. + */ + @Override + public void deleteContainer(String containerName) throws IOException { + lock.lock(); + try { + byte[] dbKey = containerName.getBytes(encoding); + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes == null) { + throw new SCMException( + "Failed to delete container " + containerName + ", reason : " + + "container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + containerStore.delete(dbKey); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} Used by client to update container state on SCM. + */ + @Override + public HddsProtos.LifeCycleState updateContainerState( + String containerName, HddsProtos.LifeCycleEvent event) throws + IOException { + ContainerInfo containerInfo; + lock.lock(); + try { + byte[] dbKey = containerName.getBytes(encoding); + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes == null) { + throw new SCMException( + "Failed to update container state" + + containerName + + ", reason : container doesn't exist.", + SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER); + } + containerInfo = + ContainerInfo.fromProtobuf(HddsProtos.SCMContainerInfo.PARSER + .parseFrom(containerBytes)); + + Preconditions.checkNotNull(containerInfo); + switch (event) { + case CREATE: + // Acquire lease on container + Lease<ContainerInfo> containerLease = + containerLeaseManager.acquire(containerInfo); + // Register callback to be executed in case of timeout + containerLease.registerCallBack(() -> { + updateContainerState(containerName, + HddsProtos.LifeCycleEvent.TIMEOUT); + return null; + }); + break; + case CREATED: + // Release the lease on container + containerLeaseManager.release(containerInfo); + break; + case FINALIZE: + // TODO: we don't need a lease manager here for closing as the + // container report will include the container state after HDFS-13008 + // If a client failed to update the container close state, DN container + // report from 3 DNs will be used to close the container eventually. + break; + case CLOSE: + break; + case UPDATE: + break; + case DELETE: + break; + case TIMEOUT: + break; + case CLEANUP: + break; + default: + throw new SCMException("Unsupported container LifeCycleEvent.", + FAILED_TO_CHANGE_CONTAINER_STATE); + } + // If the below updateContainerState call fails, we should revert the + // changes made in switch case. + // Like releasing the lease in case of BEGIN_CREATE. + ContainerInfo updatedContainer = containerStateManager + .updateContainerState(containerInfo, event); + containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); + return updatedContainer.getState(); + } catch (LeaseException e) { + throw new IOException("Lease Exception.", e); + } finally { + lock.unlock(); + } + } + + /** + * Returns the container State Manager. + * + * @return ContainerStateManager + */ + @Override + public ContainerStateManager getStateManager() { + return containerStateManager; + } + + /** + * Process container report from Datanode. + * <p> + * Processing follows a very simple logic for time being. + * <p> + * 1. Datanodes report the current State -- denoted by the datanodeState + * <p> + * 2. We are the older SCM state from the Database -- denoted by + * the knownState. + * <p> + * 3. We copy the usage etc. from currentState to newState and log that + * newState to the DB. This allows us SCM to bootup again and read the + * state of the world from the DB, and then reconcile the state from + * container reports, when they arrive. + * + * @param reports Container report + */ + @Override + public void processContainerReports(ContainerReportsRequestProto reports) + throws IOException { + List<StorageContainerDatanodeProtocolProtos.ContainerInfo> + containerInfos = reports.getReportsList(); + containerSupervisor.handleContainerReport(reports); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : + containerInfos) { + byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray(); + lock.lock(); + try { + byte[] containerBytes = containerStore.get(dbKey); + if (containerBytes != null) { + HddsProtos.SCMContainerInfo knownState = + HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); + + HddsProtos.SCMContainerInfo newState = + reconcileState(datanodeState, knownState); + + // FIX ME: This can be optimized, we write twice to memory, where a + // single write would work well. + // + // We need to write this to DB again since the closed only write + // the updated State. + containerStore.put(dbKey, newState.toByteArray()); + + // If the container is closed, then state is already written to SCM + // DB.TODO: So can we can write only once to DB. + if (closeContainerIfNeeded(newState)) { + LOG.info("Closing the Container: {}", newState.getContainerName()); + } + } else { + // Container not found in our container db. + LOG.error("Error while processing container report from datanode :" + + " {}, for container: {}, reason: container doesn't exist in" + + "container database.", reports.getDatanodeDetails(), + datanodeState.getContainerName()); + } + } finally { + lock.unlock(); + } + } + } + + /** + * Reconciles the state from Datanode with the state in SCM. + * + * @param datanodeState - State from the Datanode. + * @param knownState - State inside SCM. + * @return new SCM State for this container. + */ + private HddsProtos.SCMContainerInfo reconcileState( + StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState, + HddsProtos.SCMContainerInfo knownState) { + HddsProtos.SCMContainerInfo.Builder builder = + HddsProtos.SCMContainerInfo.newBuilder(); + builder.setContainerName(knownState.getContainerName()); + builder.setPipeline(knownState.getPipeline()); + // If used size is greater than allocated size, we will be updating + // allocated size with used size. This update is done as a fallback + // mechanism in case SCM crashes without properly updating allocated + // size. Correct allocated value will be updated by + // ContainerStateManager during SCM shutdown. + long usedSize = datanodeState.getUsed(); + long allocated = knownState.getAllocatedBytes() > usedSize ? + knownState.getAllocatedBytes() : usedSize; + builder.setAllocatedBytes(allocated); + builder.setUsedBytes(usedSize); + builder.setNumberOfKeys(datanodeState.getKeyCount()); + builder.setState(knownState.getState()); + builder.setStateEnterTime(knownState.getStateEnterTime()); + builder.setContainerID(knownState.getContainerID()); + if (knownState.getOwner() != null) { + builder.setOwner(knownState.getOwner()); + } + return builder.build(); + } + + /** + * Queues the close container command, to datanode and writes the new state + * to container DB. + * <p> + * TODO : Remove this 2 ContainerInfo definitions. It is brain dead to have + * one protobuf in one file and another definition in another file. + * + * @param newState - This is the state we maintain in SCM. + * @throws IOException + */ + private boolean closeContainerIfNeeded(HddsProtos.SCMContainerInfo newState) + throws IOException { + float containerUsedPercentage = 1.0f * + newState.getUsedBytes() / this.size; + + ContainerInfo scmInfo = getContainer(newState.getContainerName()); + if (containerUsedPercentage >= containerCloseThreshold + && !isClosed(scmInfo)) { + // We will call closer till get to the closed state. + // That is SCM will make this call repeatedly until we reach the closed + // state. + closer.close(newState); + + if (shouldClose(scmInfo)) { + // This event moves the Container from Open to Closing State, this is + // a state inside SCM. This is the desired state that SCM wants this + // container to reach. We will know that a container has reached the + // closed state from container reports. This state change should be + // invoked once and only once. + HddsProtos.LifeCycleState state = updateContainerState( + scmInfo.getContainerName(), + HddsProtos.LifeCycleEvent.FINALIZE); + if (state != HddsProtos.LifeCycleState.CLOSING) { + LOG.error("Failed to close container {}, reason : Not able " + + "to " + + "update container state, current container state: {}.", + newState.getContainerName(), state); + return false; + } + return true; + } + } + return false; + } + + /** + * In Container is in closed state, if it is in closed, Deleting or Deleted + * State. + * + * @param info - ContainerInfo. + * @return true if is in open state, false otherwise + */ + private boolean shouldClose(ContainerInfo info) { + return info.getState() == HddsProtos.LifeCycleState.OPEN; + } + + private boolean isClosed(ContainerInfo info) { + return info.getState() == HddsProtos.LifeCycleState.CLOSED; + } + + @VisibleForTesting + public ContainerCloser getCloser() { + return closer; + } + + /** + * Closes this stream and releases any system resources associated with it. + * If the stream is + * already closed then invoking this method has no effect. + * <p> + * <p>As noted in {@link AutoCloseable#close()}, cases where the close may + * fail require careful + * attention. It is strongly advised to relinquish the underlying resources + * and to internally + * <em>mark</em> the {@code Closeable} as closed, prior to throwing the + * {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + if (containerLeaseManager != null) { + containerLeaseManager.shutdown(); + } + if (containerStateManager != null) { + flushContainerInfo(); + containerStateManager.close(); + } + if (containerStore != null) { + containerStore.close(); + } + } + + /** + * Since allocatedBytes of a container is only in memory, stored in + * containerStateManager, when closing ContainerMapping, we need to update + * this in the container store. + * + * @throws IOException on failure. + */ + @VisibleForTesting + public void flushContainerInfo() throws IOException { + List<ContainerInfo> containers = containerStateManager.getAllContainers(); + List<String> failedContainers = new ArrayList<>(); + for (ContainerInfo info : containers) { + // even if some container updated failed, others can still proceed + try { + byte[] dbKey = info.getContainerName().getBytes(encoding); + byte[] containerBytes = containerStore.get(dbKey); + // TODO : looks like when a container is deleted, the container is + // removed from containerStore but not containerStateManager, so it can + // return info of a deleted container. may revisit this in the future, + // for now, just skip a not-found container + if (containerBytes != null) { + HddsProtos.SCMContainerInfo oldInfoProto = + HddsProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes); + ContainerInfo oldInfo = ContainerInfo.fromProtobuf(oldInfoProto); + ContainerInfo newInfo = new ContainerInfo.Builder() + .setAllocatedBytes(info.getAllocatedBytes()) + .setContainerName(oldInfo.getContainerName()) + .setNumberOfKeys(oldInfo.getNumberOfKeys()) + .setOwner(oldInfo.getOwner()) + .setPipeline(oldInfo.getPipeline()) + .setState(oldInfo.getState()) + .setUsedBytes(oldInfo.getUsedBytes()) + .build(); + containerStore.put(dbKey, newInfo.getProtobuf().toByteArray()); + } else { + LOG.debug("Container state manager has container {} but not found " + + "in container store, a deleted container?", + info.getContainerName()); + } + } catch (IOException ioe) { + failedContainers.add(info.getContainerName()); + } + } + if (!failedContainers.isEmpty()) { + throw new IOException("Error in flushing container info from container " + + "state manager: " + failedContainers); + } + } + + @VisibleForTesting + public MetadataStore getContainerStore() { + return containerStore; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java new file mode 100644 index 0000000..227eca0 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.states.ContainerState; +import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.common.statemachine + .InvalidStateTransitionException; +import org.apache.hadoop.ozone.common.statemachine.StateMachine; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_CHANGE_CONTAINER_STATE; + +/** + * A container state manager keeps track of container states and returns + * containers that match various queries. + * <p> + * This state machine is driven by a combination of server and client actions. + * <p> + * This is how a create container happens: 1. When a container is created, the + * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM + * has chosen a pipeline for container to live on. However, the container is not + * created yet. This container along with the pipeline is returned to the + * client. + * <p> + * 2. The client when it sees the Container state as ALLOCATED understands that + * container needs to be created on the specified pipeline. The client lets the + * SCM know that saw this flag and is initiating the on the data nodes. + * <p> + * This is done by calling into notifyObjectCreation(ContainerName, + * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state + * into CREATING. All this state means is that SCM told Client to create a + * container and client saw that request. + * <p> + * 3. Then client makes calls to datanodes directly, asking the datanodes to + * create the container. This is done with the help of pipeline that supports + * this container. + * <p> + * 4. Once the creation of the container is complete, the client will make + * another call to the SCM, this time specifying the containerName and the + * COMPLETE_CREATE as the Event. + * <p> + * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is + * the state when clients can write to a container. + * <p> + * 6. If the client does not respond with the COMPLETE_CREATE event with a + * certain time, the state machine times out and triggers a delete operation of + * the container. + * <p> + * Please see the function initializeStateMachine below to see how this looks in + * code. + * <p> + * Reusing existing container : + * <p> + * The create container call is not made all the time, the system tries to use + * open containers as much as possible. So in those cases, it looks thru the + * list of open containers and will return containers that match the specific + * signature. + * <p> + * Please note : Logically there are 3 separate state machines in the case of + * containers. + * <p> + * The Create State Machine -- Commented extensively above. + * <p> + * Open/Close State Machine - Once the container is in the Open State, + * eventually it will be closed, once sufficient data has been written to it. + * <p> + * TimeOut Delete Container State Machine - if the container creating times out, + * then Container State manager decides to delete the container. + */ +public class ContainerStateManager implements Closeable { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerStateManager.class); + + private final StateMachine<HddsProtos.LifeCycleState, + HddsProtos.LifeCycleEvent> stateMachine; + + private final long containerSize; + private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap; + private final ContainerStateMap containers; + private final AtomicLong containerCount; + + /** + * Constructs a Container State Manager that tracks all containers owned by + * SCM for the purpose of allocation of blocks. + * <p> + * TODO : Add Container Tags so we know which containers are owned by SCM. + */ + @SuppressWarnings("unchecked") + public ContainerStateManager(Configuration configuration, + Mapping containerMapping) { + + // Initialize the container state machine. + Set<HddsProtos.LifeCycleState> finalStates = new HashSet(); + + // These are the steady states of a container. + finalStates.add(LifeCycleState.OPEN); + finalStates.add(LifeCycleState.CLOSED); + finalStates.add(LifeCycleState.DELETED); + + this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED, + finalStates); + initializeStateMachine(); + + this.containerSize = OzoneConsts.GB * configuration.getInt( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT); + + lastUsedMap = new ConcurrentHashMap<>(); + containerCount = new AtomicLong(0); + containers = new ContainerStateMap(); + loadExistingContainers(containerMapping); + } + + private void loadExistingContainers(Mapping containerMapping) { + + List<ContainerInfo> containerList; + try { + containerList = containerMapping.listContainer(null, + null, Integer.MAX_VALUE); + + // if there are no container to load, let us return. + if (containerList == null || containerList.size() == 0) { + LOG.info("No containers to load for this cluster."); + return; + } + } catch (IOException e) { + if (!e.getMessage().equals("No container exists in current db")) { + LOG.error("Could not list the containers", e); + } + return; + } + + try { + long maxID = 0; + for (ContainerInfo container : containerList) { + containers.addContainer(container); + + if (maxID < container.getContainerID()) { + maxID = container.getContainerID(); + } + + containerCount.set(maxID); + } + } catch (SCMException ex) { + LOG.error("Unable to create a container information. ", ex); + // Fix me, what is the proper shutdown procedure for SCM ?? + // System.exit(1) // Should we exit here? + } + } + + /** + * Return the info of all the containers kept by the in-memory mapping. + * + * @return the list of all container info. + */ + public List<ContainerInfo> getAllContainers() { + List<ContainerInfo> list = new ArrayList<>(); + + //No Locking needed since the return value is an immutable map. + containers.getContainerMap().forEach((key, value) -> list.add(value)); + return list; + } + + /* + * + * Event and State Transition Mapping: + * + * State: ALLOCATED ---------------> CREATING + * Event: CREATE + * + * State: CREATING ---------------> OPEN + * Event: CREATED + * + * State: OPEN ---------------> CLOSING + * Event: FINALIZE + * + * State: CLOSING ---------------> CLOSED + * Event: CLOSE + * + * State: CLOSED ----------------> DELETING + * Event: DELETE + * + * State: DELETING ----------------> DELETED + * Event: CLEANUP + * + * State: CREATING ---------------> DELETING + * Event: TIMEOUT + * + * + * Container State Flow: + * + * [ALLOCATED]------->[CREATING]--------->[OPEN]---------->[CLOSING]------->[CLOSED] + * (CREATE) | (CREATED) (FINALIZE) (CLOSE) | + * | | + * | | + * |(TIMEOUT) (DELETE)| + * | | + * +------------------> [DELETING] <-------------------+ + * | + * | + * (CLEANUP)| + * | + * [DELETED] + */ + private void initializeStateMachine() { + stateMachine.addTransition(LifeCycleState.ALLOCATED, + LifeCycleState.CREATING, + LifeCycleEvent.CREATE); + + stateMachine.addTransition(LifeCycleState.CREATING, + LifeCycleState.OPEN, + LifeCycleEvent.CREATED); + + stateMachine.addTransition(LifeCycleState.OPEN, + LifeCycleState.CLOSING, + LifeCycleEvent.FINALIZE); + + stateMachine.addTransition(LifeCycleState.CLOSING, + LifeCycleState.CLOSED, + LifeCycleEvent.CLOSE); + + stateMachine.addTransition(LifeCycleState.CLOSED, + LifeCycleState.DELETING, + LifeCycleEvent.DELETE); + + stateMachine.addTransition(LifeCycleState.CREATING, + LifeCycleState.DELETING, + LifeCycleEvent.TIMEOUT); + + stateMachine.addTransition(LifeCycleState.DELETING, + LifeCycleState.DELETED, + LifeCycleEvent.CLEANUP); + } + + /** + * allocates a new container based on the type, replication etc. + * + * @param selector -- Pipeline selector class. + * @param type -- Replication type. + * @param replicationFactor - Replication replicationFactor. + * @param containerName - Container Name. + * @return Container Info. + * @throws IOException on Failure. + */ + public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos + .ReplicationType type, HddsProtos.ReplicationFactor replicationFactor, + final String containerName, String owner) throws + IOException { + + Pipeline pipeline = selector.getReplicationPipeline(type, + replicationFactor, containerName); + + Preconditions.checkNotNull(pipeline, "Pipeline type=%s/" + + "replication=%s couldn't be found for the new container. " + + "Do you have enough nodes?", type, replicationFactor); + + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerName(containerName) + .setState(HddsProtos.LifeCycleState.ALLOCATED) + .setPipeline(pipeline) + // This is bytes allocated for blocks inside container, not the + // container size + .setAllocatedBytes(0) + .setUsedBytes(0) + .setNumberOfKeys(0) + .setStateEnterTime(Time.monotonicNow()) + .setOwner(owner) + .setContainerID(containerCount.incrementAndGet()) + .build(); + Preconditions.checkNotNull(containerInfo); + containers.addContainer(containerInfo); + LOG.trace("New container allocated: {}", containerInfo); + return containerInfo; + } + + /** + * Update the Container State to the next state. + * + * @param info - ContainerInfo + * @param event - LifeCycle Event + * @return Updated ContainerInfo. + * @throws SCMException on Failure. + */ + public ContainerInfo updateContainerState(ContainerInfo + info, HddsProtos.LifeCycleEvent event) throws SCMException { + LifeCycleState newState; + try { + newState = this.stateMachine.getNextState(info.getState(), event); + } catch (InvalidStateTransitionException ex) { + String error = String.format("Failed to update container state %s, " + + "reason: invalid state transition from state: %s upon " + + "event: %s.", + info.getPipeline().getContainerName(), info.getState(), event); + LOG.error(error); + throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE); + } + + // This is a post condition after executing getNextState. + Preconditions.checkNotNull(newState); + containers.updateState(info, info.getState(), newState); + return containers.getContainerInfo(info); + } + + /** + * Update the container State. + * @param info - Container Info + * @return ContainerInfo + * @throws SCMException - on Error. + */ + public ContainerInfo updateContainerInfo(ContainerInfo info) + throws SCMException { + containers.updateContainerInfo(info); + return containers.getContainerInfo(info); + } + + + /** + * Return a container matching the attributes specified. + * + * @param size - Space needed in the Container. + * @param owner - Owner of the container - A specific nameservice. + * @param type - Replication Type {StandAlone, Ratis} + * @param factor - Replication Factor {ONE, THREE} + * @param state - State of the Container-- {Open, Allocated etc.} + * @return ContainerInfo, null if there is no match found. + */ + public ContainerInfo getMatchingContainer(final long size, + String owner, ReplicationType type, ReplicationFactor factor, + LifeCycleState state) { + + // Find containers that match the query spec, if no match return null. + NavigableSet<ContainerID> matchingSet = + containers.getMatchingContainerIDs(state, owner, factor, type); + if (matchingSet == null || matchingSet.size() == 0) { + return null; + } + + // Get the last used container and find container above the last used + // container ID. + ContainerState key = new ContainerState(owner, type, factor); + ContainerID lastID = lastUsedMap.get(key); + if(lastID == null) { + lastID = matchingSet.first(); + } + + // There is a small issue here. The first time, we will skip the first + // container. But in most cases it will not matter. + NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false); + if (resultSet.size() == 0) { + resultSet = matchingSet; + } + + ContainerInfo selectedContainer = + findContainerWithSpace(size, resultSet, owner); + if (selectedContainer == null) { + + // If we did not find any space in the tailSet, we need to look for + // space in the headset, we need to pass true to deal with the + // situation that we have a lone container that has space. That is we + // ignored the last used container under the assumption we can find + // other containers with space, but if have a single container that is + // not true. Hence we need to include the last used container as the + // last element in the sorted set. + + resultSet = matchingSet.headSet(lastID, true); + selectedContainer = findContainerWithSpace(size, resultSet, owner); + } + // Update the allocated Bytes on this container. + if(selectedContainer != null) { + selectedContainer.updateAllocatedBytes(size); + } + return selectedContainer; + + } + + private ContainerInfo findContainerWithSpace(long size, + NavigableSet<ContainerID> searchSet, String owner) { + // Get the container with space to meet our request. + for (ContainerID id : searchSet) { + ContainerInfo containerInfo = containers.getContainerInfo(id.getId()); + if (containerInfo.getAllocatedBytes() + size <= this.containerSize) { + containerInfo.updateLastUsedTime(); + + ContainerState key = new ContainerState(owner, + containerInfo.getPipeline().getType(), + containerInfo.getPipeline().getFactor()); + lastUsedMap.put(key, containerInfo.containerID()); + return containerInfo; + } + } + return null; + } + + /** + * Returns a set of ContainerIDs that match the Container. + * + * @param owner Owner of the Containers. + * @param type - Replication Type of the containers + * @param factor - Replication factor of the containers. + * @param state - Current State, like Open, Close etc. + * @return Set of containers that match the specific query parameters. + */ + public NavigableSet<ContainerID> getMatchingContainerIDs( + String owner, ReplicationType type, ReplicationFactor factor, + LifeCycleState state) { + return containers.getMatchingContainerIDs(state, owner, + factor, type); + } + + @Override + public void close() throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java new file mode 100644 index 0000000..c949c6c --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Mapping class contains the mapping from a name to a pipeline mapping. This is + * used by SCM when allocating new locations and when looking up a key. + */ +public interface Mapping extends Closeable { + /** + * Returns the ContainerInfo from the container name. + * + * @param containerName - Name + * @return - ContainerInfo such as creation state and the pipeline. + * @throws IOException + */ + ContainerInfo getContainer(String containerName) throws IOException; + + /** + * Returns containers under certain conditions. + * Search container names from start name(exclusive), + * and use prefix name to filter the result. The max + * size of the searching range cannot exceed the + * value of count. + * + * @param startName start name, if null, start searching at the head. + * @param prefixName prefix name, if null, then filter is disabled. + * @param count count, if count < 0, the max size is unlimited.( + * Usually the count will be replace with a very big + * value instead of being unlimited in case the db is very big) + * + * @return a list of container. + * @throws IOException + */ + List<ContainerInfo> listContainer(String startName, String prefixName, + int count) throws IOException; + + /** + * Allocates a new container for a given keyName and replication factor. + * + * @param replicationFactor - replication factor of the container. + * @param containerName - Name. + * @param owner + * @return - Container Info. + * @throws IOException + */ + ContainerInfo allocateContainer(HddsProtos.ReplicationType type, + HddsProtos.ReplicationFactor replicationFactor, + String containerName, String owner) throws IOException; + + /** + * Deletes a container from SCM. + * + * @param containerName - Container Name + * @throws IOException + */ + void deleteContainer(String containerName) throws IOException; + + /** + * Update container state. + * @param containerName - Container Name + * @param event - container life cycle event + * @return - new container state + * @throws IOException + */ + HddsProtos.LifeCycleState updateContainerState(String containerName, + HddsProtos.LifeCycleEvent event) throws IOException; + + /** + * Returns the container State Manager. + * @return ContainerStateManager + */ + ContainerStateManager getStateManager(); + + /** + * Process container report from Datanode. + * + * @param reports Container report + */ + void processContainerReports(ContainerReportsRequestProto reports) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java new file mode 100644 index 0000000..b5d4da9 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +package org.apache.hadoop.hdds.scm.container.closer; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT; + +/** + * A class that manages closing of containers. This allows transition from a + * open but full container to a closed container, to which no data is written. + */ +public class ContainerCloser { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCloser.class); + private static final long MULTIPLIER = 3L; + private static final int CLEANUP_WATER_MARK = 1000; + private final NodeManager nodeManager; + private final Map<String, Long> commandIssued; + private final Configuration configuration; + private final AtomicInteger mapCount; + private final long reportInterval; + private final AtomicInteger threadRunCount; + private final AtomicBoolean isRunning; + + /** + * Constructs the ContainerCloser class. + * + * @param nodeManager - NodeManager + * @param conf - Configuration + */ + public ContainerCloser(NodeManager nodeManager, Configuration conf) { + Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(conf); + this.nodeManager = nodeManager; + this.configuration = conf; + this.commandIssued = new ConcurrentHashMap<>(); + this.mapCount = new AtomicInteger(0); + this.threadRunCount = new AtomicInteger(0); + this.isRunning = new AtomicBoolean(false); + this.reportInterval = this.configuration.getTimeDuration( + OZONE_CONTAINER_REPORT_INTERVAL, + OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, TimeUnit.SECONDS); + Preconditions.checkState(this.reportInterval > 0, + "report interval has to be greater than 0"); + } + + @VisibleForTesting + public static int getCleanupWaterMark() { + return CLEANUP_WATER_MARK; + } + + /** + * Sends a Container Close command to the data nodes where this container + * lives. + * + * @param info - ContainerInfo. + */ + public void close(HddsProtos.SCMContainerInfo info) { + + if (commandIssued.containsKey(info.getContainerName())) { + // We check if we issued a close command in last 3 * reportInterval secs. + long commandQueueTime = commandIssued.get(info.getContainerName()); + long currentTime = TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow()); + if (currentTime > commandQueueTime + (MULTIPLIER * reportInterval)) { + commandIssued.remove(info.getContainerName()); + mapCount.decrementAndGet(); + } else { + // Ignore this request, since we just issued a close command. We + // should wait instead of sending a command to datanode again. + return; + } + } + + // if we reached here, it means that we have not issued a command to the + // data node in last (3 times report interval). We are presuming that is + // enough time to close the container. Let us go ahead and queue a close + // to all the datanodes that participate in the container. + // + // Three important things to note here: + // + // 1. It is ok to send this command multiple times to a datanode. Close + // container is an idempotent command, if the container is already closed + // then we have no issues. + // + // 2. The container close command is issued to all datanodes. But + // depending on the pipeline type, some of the datanodes might ignore it. + // + // 3. SCM will see that datanode is closed from container reports, but it + // is possible that datanodes might get close commands since + // this queue can be emptied by a datanode after a close report is send + // to SCM. In that case also, data node will ignore this command. + + HddsProtos.Pipeline pipeline = info.getPipeline(); + for (HddsProtos.DatanodeDetailsProto datanodeDetails : + pipeline.getPipelineChannel().getMembersList()) { + nodeManager.addDatanodeCommand( + DatanodeDetails.getFromProtoBuf(datanodeDetails).getUuid(), + new CloseContainerCommand(info.getContainerName())); + } + if (!commandIssued.containsKey(info.getContainerName())) { + commandIssued.put(info.getContainerName(), + TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())); + mapCount.incrementAndGet(); + } + // run the hash map cleaner thread if needed, non-blocking call. + runCleanerThreadIfNeeded(); + } + + private void runCleanerThreadIfNeeded() { + // Let us check if we should run a cleaner thread, not using map.size + // since it runs a loop in the case of the concurrentMap. + if (mapCount.get() > CLEANUP_WATER_MARK && + isRunning.compareAndSet(false, true)) { + Runnable entryCleaner = () -> { + LOG.debug("Starting close container Hash map cleaner."); + try { + for (Map.Entry<String, Long> entry : commandIssued.entrySet()) { + long commandQueueTime = entry.getValue(); + if (commandQueueTime + (MULTIPLIER * reportInterval) > + TimeUnit.MILLISECONDS.toSeconds(Time.monotonicNow())) { + + // It is possible for this remove to fail due to race conditions. + // No big deal we will cleanup next time. + commandIssued.remove(entry.getKey()); + mapCount.decrementAndGet(); + } + } + isRunning.compareAndSet(true, false); + LOG.debug("Finished running, close container Hash map cleaner."); + } catch (Exception ex) { + LOG.error("Unable to finish cleaning the closed containers map.", ex); + } + }; + + // Launch the cleaner thread when we need instead of having a daemon + // thread that is sleeping all the time. We need to set the Daemon to + // true to avoid blocking clean exits. + Thread cleanerThread = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Closed Container Cleaner Thread - %d") + .build().newThread(entryCleaner); + threadRunCount.incrementAndGet(); + cleanerThread.start(); + } + } + + @VisibleForTesting + public int getThreadRunCount() { + return threadRunCount.get(); + } + + @VisibleForTesting + public int getCloseCount() { + return mapCount.get(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java new file mode 100644 index 0000000..ee02bbd --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + */ + +/** + * This package has class that close a container. That is move a container from + * open state to close state. + */ +package org.apache.hadoop.hdds.scm.container.closer; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java new file mode 100644 index 0000000..3f8d056 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container; +/** + * This package contains routines to manage the container location and + * mapping inside SCM + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java new file mode 100644 index 0000000..5d91ac5 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import java.io.IOException; +import java.util.List; + +/** + * A ContainerPlacementPolicy support choosing datanodes to build replication + * pipeline with specified constraints. + */ +public interface ContainerPlacementPolicy { + + /** + * Given the replication factor and size required, return set of datanodes + * that satisfy the nodes and size requirement. + * @param nodesRequired - number of datanodes required. + * @param sizeRequired - size required for the container or block. + * @return list of datanodes chosen. + * @throws IOException + */ + List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired) + throws IOException; +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org