http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java deleted file mode 100644 index 23c6983..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.block; - - -/** - * JMX interface for the block manager. - */ -public interface BlockmanagerMXBean { - - /** - * Number of open containers manager by the block manager. - */ - int getOpenContainersNo(); -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java deleted file mode 100644 index 8702a42..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import com.google.common.collect.ArrayListMultimap; -import org.apache.hadoop.hdds.scm.container.Mapping; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; - -/** - * A wrapper class to hold info about datanode and all deleted block - * transactions that will be sent to this datanode. - */ -public class DatanodeDeletedBlockTransactions { - private int nodeNum; - // The throttle size for each datanode. - private int maximumAllowedTXNum; - // Current counter of inserted TX. - private int currentTXNum; - private Mapping mappingService; - // A list of TXs mapped to a certain datanode ID. - private final ArrayListMultimap<UUID, DeletedBlocksTransaction> - transactions; - - DatanodeDeletedBlockTransactions(Mapping mappingService, - int maximumAllowedTXNum, int nodeNum) { - this.transactions = ArrayListMultimap.create(); - this.mappingService = mappingService; - this.maximumAllowedTXNum = maximumAllowedTXNum; - this.nodeNum = nodeNum; - } - - public boolean addTransaction(DeletedBlocksTransaction tx, - Set<UUID> dnsWithTransactionCommitted) { - Pipeline pipeline = null; - try { - ContainerWithPipeline containerWithPipeline = - mappingService.getContainerWithPipeline(tx.getContainerID()); - if (containerWithPipeline.getContainerInfo().isContainerOpen() - || containerWithPipeline.getPipeline().isEmpty()) { - return false; - } - pipeline = containerWithPipeline.getPipeline(); - } catch (IOException e) { - SCMBlockDeletingService.LOG.warn("Got container info error.", e); - return false; - } - - boolean success = false; - for (DatanodeDetails dd : pipeline.getMachines()) { - UUID dnID = dd.getUuid(); - if (dnsWithTransactionCommitted == null || - !dnsWithTransactionCommitted.contains(dnID)) { - // Transaction need not be sent to dns which have already committed it - success = addTransactionToDN(dnID, tx); - } - } - return success; - } - - private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) { - if (transactions.containsKey(dnID)) { - List<DeletedBlocksTransaction> txs = transactions.get(dnID); - if (txs != null && txs.size() < maximumAllowedTXNum) { - boolean hasContained = false; - for (DeletedBlocksTransaction t : txs) { - if (t.getContainerID() == tx.getContainerID()) { - hasContained = true; - break; - } - } - - if (!hasContained) { - txs.add(tx); - currentTXNum++; - return true; - } - } - } else { - currentTXNum++; - transactions.put(dnID, tx); - return true; - } - SCMBlockDeletingService.LOG - .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID()); - return false; - } - - Set<UUID> getDatanodeIDs() { - return transactions.keySet(); - } - - boolean isEmpty() { - return transactions.isEmpty(); - } - - boolean hasTransactions(UUID dnId) { - return transactions.containsKey(dnId) && - !transactions.get(dnId).isEmpty(); - } - - List<DeletedBlocksTransaction> getDatanodeTransactions(UUID dnId) { - return transactions.get(dnId); - } - - List<String> getTransactionIDList(UUID dnId) { - if (hasTransactions(dnId)) { - return transactions.get(dnId).stream() - .map(DeletedBlocksTransaction::getTxID).map(String::valueOf) - .collect(Collectors.toList()); - } else { - return Collections.emptyList(); - } - } - - boolean isFull() { - return currentTXNum >= maximumAllowedTXNum * nodeNum; - } - - int getTXNum() { - return currentTXNum; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java deleted file mode 100644 index db6c1c5..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto - .DeleteBlockTransactionResult; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -/** - * The DeletedBlockLog is a persisted log in SCM to keep tracking - * container blocks which are under deletion. It maintains info - * about under-deletion container blocks that notified by OM, - * and the state how it is processed. - */ -public interface DeletedBlockLog extends Closeable { - - /** - * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions. - * Once DatanodeDeletedBlockTransactions is full, the scan behavior will - * stop. - * @param transactions a list of TXs will be set into. - * @return Mapping from containerId to latest transactionId for the container. - * @throws IOException - */ - Map<Long, Long> getTransactions(DatanodeDeletedBlockTransactions transactions) - throws IOException; - - /** - * Return all failed transactions in the log. A transaction is considered - * to be failed if it has been sent more than MAX_RETRY limit and its - * count is reset to -1. - * - * @return a list of failed deleted block transactions. - * @throws IOException - */ - List<DeletedBlocksTransaction> getFailedTransactions() - throws IOException; - - /** - * Increments count for given list of transactions by 1. - * The log maintains a valid range of counts for each transaction - * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate - * the transaction is no longer valid. - * - * @param txIDs - transaction ID. - */ - void incrementCount(List<Long> txIDs) - throws IOException; - - /** - * Commits a transaction means to delete all footprints of a transaction - * from the log. This method doesn't guarantee all transactions can be - * successfully deleted, it tolerate failures and tries best efforts to. - * @param transactionResults - delete block transaction results. - * @param dnID - ID of datanode which acknowledges the delete block command. - */ - void commitTransactions(List<DeleteBlockTransactionResult> transactionResults, - UUID dnID); - - /** - * Creates a block deletion transaction and adds that into the log. - * - * @param containerID - container ID. - * @param blocks - blocks that belong to the same container. - * - * @throws IOException - */ - void addTransaction(long containerID, List<Long> blocks) - throws IOException; - - /** - * Creates block deletion transactions for a set of containers, - * add into the log and persist them atomically. An object key - * might be stored in multiple containers and multiple blocks, - * this API ensures that these updates are done in atomic manner - * so if any of them fails, the entire operation fails without - * any updates to the log. Note, this doesn't mean to create only - * one transaction, it creates multiple transactions (depends on the - * number of containers) together (on success) or non (on failure). - * - * @param containerBlocksMap a map of containerBlocks. - * @throws IOException - */ - void addTransactions(Map<Long, List<Long>> containerBlocksMap) - throws IOException; - - /** - * Returns the total number of valid transactions. A transaction is - * considered to be valid as long as its count is in range [0, MAX_RETRY]. - * - * @return number of a valid transactions. - * @throws IOException - */ - int getNumOfValidTransactions() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java deleted file mode 100644 index 68435d1..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ /dev/null @@ -1,428 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto - .DeleteBlockTransactionResult; -import org.apache.hadoop.hdds.scm.command - .CommandStatusReportHandler.DeleteBlockStatus; -import org.apache.hadoop.hdds.scm.container.Mapping; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.utils.BatchOperation; -import org.apache.hadoop.utils.MetadataStore; -import org.apache.hadoop.utils.MetadataStoreBuilder; -import org.eclipse.jetty.util.ConcurrentHashSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - -import static java.lang.Math.min; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_BLOCK_DELETION_MAX_RETRY; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys - .OZONE_SCM_DB_CACHE_SIZE_MB; -import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; -import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB; - -/** - * A implement class of {@link DeletedBlockLog}, and it uses - * K/V db to maintain block deletion transactions between scm and datanode. - * This is a very basic implementation, it simply scans the log and - * memorize the position that scanned by last time, and uses this to - * determine where the next scan starts. It has no notion about weight - * of each transaction so as long as transaction is still valid, they get - * equally same chance to be retrieved which only depends on the nature - * order of the transaction ID. - */ -public class DeletedBlockLogImpl - implements DeletedBlockLog, EventHandler<DeleteBlockStatus> { - - public static final Logger LOG = - LoggerFactory.getLogger(DeletedBlockLogImpl.class); - - private static final byte[] LATEST_TXID = - DFSUtil.string2Bytes("#LATEST_TXID#"); - - private final int maxRetry; - private final MetadataStore deletedStore; - private final Mapping containerManager; - private final Lock lock; - // The latest id of deleted blocks in the db. - private long lastTxID; - // Maps txId to set of DNs which are successful in committing the transaction - private Map<Long, Set<UUID>> transactionToDNsCommitMap; - - public DeletedBlockLogImpl(Configuration conf, Mapping containerManager) - throws IOException { - maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY, - OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT); - - File metaDir = getOzoneMetaDirPath(conf); - String scmMetaDataDir = metaDir.getPath(); - File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB); - int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, - OZONE_SCM_DB_CACHE_SIZE_DEFAULT); - // Load store of all transactions. - deletedStore = MetadataStoreBuilder.newBuilder() - .setCreateIfMissing(true) - .setConf(conf) - .setDbFile(deletedLogDbPath) - .setCacheSize(cacheSize * OzoneConsts.MB) - .build(); - this.containerManager = containerManager; - - this.lock = new ReentrantLock(); - // start from the head of deleted store. - lastTxID = findLatestTxIDInStore(); - - // transactionToDNsCommitMap is updated only when - // transaction is added to the log and when it is removed. - - // maps transaction to dns which have committed it. - transactionToDNsCommitMap = new ConcurrentHashMap<>(); - } - - @VisibleForTesting - public MetadataStore getDeletedStore() { - return deletedStore; - } - - /** - * There is no need to lock before reading because - * it's only used in construct method. - * - * @return latest txid. - * @throws IOException - */ - private long findLatestTxIDInStore() throws IOException { - long txid = 0; - byte[] value = deletedStore.get(LATEST_TXID); - if (value != null) { - txid = Longs.fromByteArray(value); - } - return txid; - } - - @Override - public List<DeletedBlocksTransaction> getFailedTransactions() - throws IOException { - lock.lock(); - try { - final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList(); - deletedStore.iterate(null, (key, value) -> { - if (!Arrays.equals(LATEST_TXID, key)) { - DeletedBlocksTransaction delTX = - DeletedBlocksTransaction.parseFrom(value); - if (delTX.getCount() == -1) { - failedTXs.add(delTX); - } - } - return true; - }); - return failedTXs; - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - * - * @param txIDs - transaction ID. - * @throws IOException - */ - @Override - public void incrementCount(List<Long> txIDs) throws IOException { - BatchOperation batch = new BatchOperation(); - lock.lock(); - try { - for(Long txID : txIDs) { - try { - byte[] deleteBlockBytes = - deletedStore.get(Longs.toByteArray(txID)); - if (deleteBlockBytes == null) { - LOG.warn("Delete txID {} not found", txID); - continue; - } - DeletedBlocksTransaction block = DeletedBlocksTransaction - .parseFrom(deleteBlockBytes); - DeletedBlocksTransaction.Builder builder = block.toBuilder(); - int currentCount = block.getCount(); - if (currentCount > -1) { - builder.setCount(++currentCount); - } - // if the retry time exceeds the maxRetry value - // then set the retry value to -1, stop retrying, admins can - // analyze those blocks and purge them manually by SCMCli. - if (currentCount > maxRetry) { - builder.setCount(-1); - } - deletedStore.put(Longs.toByteArray(txID), - builder.build().toByteArray()); - } catch (IOException ex) { - LOG.warn("Cannot increase count for txID " + txID, ex); - } - } - deletedStore.writeBatch(batch); - } finally { - lock.unlock(); - } - } - - private DeletedBlocksTransaction constructNewTransaction(long txID, - long containerID, List<Long> blocks) { - return DeletedBlocksTransaction.newBuilder() - .setTxID(txID) - .setContainerID(containerID) - .addAllLocalID(blocks) - .setCount(0) - .build(); - } - - /** - * {@inheritDoc} - * - * @param transactionResults - transaction IDs. - * @param dnID - Id of Datanode which has acknowledged a delete block command. - * @throws IOException - */ - @Override - public void commitTransactions( - List<DeleteBlockTransactionResult> transactionResults, UUID dnID) { - lock.lock(); - try { - Set<UUID> dnsWithCommittedTxn; - for (DeleteBlockTransactionResult transactionResult : - transactionResults) { - if (isTransactionFailed(transactionResult)) { - continue; - } - try { - long txID = transactionResult.getTxID(); - // set of dns which have successfully committed transaction txId. - dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID); - Long containerId = transactionResult.getContainerID(); - if (dnsWithCommittedTxn == null) { - LOG.warn("Transaction txId={} commit by dnId={} for containerID={} " - + "failed. Corresponding entry not found.", txID, dnID, - containerId); - return; - } - - dnsWithCommittedTxn.add(dnID); - Pipeline pipeline = - containerManager.getContainerWithPipeline(containerId) - .getPipeline(); - Collection<DatanodeDetails> containerDnsDetails = - pipeline.getDatanodes().values(); - // The delete entry can be safely removed from the log if all the - // corresponding nodes commit the txn. It is required to check that - // the nodes returned in the pipeline match the replication factor. - if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size()) - >= pipeline.getFactor().getNumber()) { - List<UUID> containerDns = containerDnsDetails.stream() - .map(DatanodeDetails::getUuid) - .collect(Collectors.toList()); - if (dnsWithCommittedTxn.containsAll(containerDns)) { - transactionToDNsCommitMap.remove(txID); - LOG.debug("Purging txId={} from block deletion log", txID); - deletedStore.delete(Longs.toByteArray(txID)); - } - } - LOG.debug("Datanode txId={} containerId={} committed by dnId={}", - txID, containerId, dnID); - } catch (IOException e) { - LOG.warn("Could not commit delete block transaction: " + - transactionResult.getTxID(), e); - } - } - } finally { - lock.unlock(); - } - } - - private boolean isTransactionFailed(DeleteBlockTransactionResult result) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Got block deletion ACK from datanode, TXIDs={}, " + "success={}", - result.getTxID(), result.getSuccess()); - } - if (!result.getSuccess()) { - LOG.warn("Got failed ACK for TXID={}, prepare to resend the " - + "TX in next interval", result.getTxID()); - return true; - } - return false; - } - - /** - * {@inheritDoc} - * - * @param containerID - container ID. - * @param blocks - blocks that belong to the same container. - * @throws IOException - */ - @Override - public void addTransaction(long containerID, List<Long> blocks) - throws IOException { - BatchOperation batch = new BatchOperation(); - lock.lock(); - try { - DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1, - containerID, blocks); - byte[] key = Longs.toByteArray(lastTxID + 1); - - batch.put(key, tx.toByteArray()); - batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1)); - - deletedStore.writeBatch(batch); - lastTxID += 1; - } finally { - lock.unlock(); - } - } - - @Override - public int getNumOfValidTransactions() throws IOException { - lock.lock(); - try { - final AtomicInteger num = new AtomicInteger(0); - deletedStore.iterate(null, (key, value) -> { - // Exclude latest txid record - if (!Arrays.equals(LATEST_TXID, key)) { - DeletedBlocksTransaction delTX = - DeletedBlocksTransaction.parseFrom(value); - if (delTX.getCount() > -1) { - num.incrementAndGet(); - } - } - return true; - }); - return num.get(); - } finally { - lock.unlock(); - } - } - - /** - * {@inheritDoc} - * - * @param containerBlocksMap a map of containerBlocks. - * @throws IOException - */ - @Override - public void addTransactions( - Map<Long, List<Long>> containerBlocksMap) - throws IOException { - BatchOperation batch = new BatchOperation(); - lock.lock(); - try { - long currentLatestID = lastTxID; - for (Map.Entry<Long, List<Long>> entry : - containerBlocksMap.entrySet()) { - currentLatestID += 1; - byte[] key = Longs.toByteArray(currentLatestID); - DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID, - entry.getKey(), entry.getValue()); - batch.put(key, tx.toByteArray()); - } - lastTxID = currentLatestID; - batch.put(LATEST_TXID, Longs.toByteArray(lastTxID)); - deletedStore.writeBatch(batch); - } finally { - lock.unlock(); - } - } - - @Override - public void close() throws IOException { - if (deletedStore != null) { - deletedStore.close(); - } - } - - @Override - public Map<Long, Long> getTransactions( - DatanodeDeletedBlockTransactions transactions) throws IOException { - lock.lock(); - try { - Map<Long, Long> deleteTransactionMap = new HashMap<>(); - deletedStore.iterate(null, (key, value) -> { - if (!Arrays.equals(LATEST_TXID, key)) { - DeletedBlocksTransaction block = DeletedBlocksTransaction - .parseFrom(value); - - if (block.getCount() > -1 && block.getCount() <= maxRetry) { - if (transactions.addTransaction(block, - transactionToDNsCommitMap.get(block.getTxID()))) { - deleteTransactionMap.put(block.getContainerID(), block.getTxID()); - transactionToDNsCommitMap - .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>()); - } - } - return !transactions.isFull(); - } - return true; - }); - return deleteTransactionMap; - } finally { - lock.unlock(); - } - } - - @Override - public void onMessage(DeleteBlockStatus deleteBlockStatus, - EventPublisher publisher) { - ContainerBlocksDeletionACKProto ackProto = - deleteBlockStatus.getCmdStatus().getBlockDeletionAck(); - commitTransactions(ackProto.getResultsList(), - UUID.fromString(ackProto.getDnId())); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java deleted file mode 100644 index 736daac..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; - -public class PendingDeleteHandler implements - EventHandler<PendingDeleteStatusList> { - - private SCMBlockDeletingService scmBlockDeletingService; - - public PendingDeleteHandler( - SCMBlockDeletingService scmBlockDeletingService) { - this.scmBlockDeletingService = scmBlockDeletingService; - } - - @Override - public void onMessage(PendingDeleteStatusList pendingDeleteStatusList, - EventPublisher publisher) { - scmBlockDeletingService.handlePendingDeletes(pendingDeleteStatusList); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java deleted file mode 100644 index 904762d..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -import java.util.LinkedList; -import java.util.List; - -public class PendingDeleteStatusList { - - private List<PendingDeleteStatus> pendingDeleteStatuses; - private DatanodeDetails datanodeDetails; - - public PendingDeleteStatusList(DatanodeDetails datanodeDetails) { - this.datanodeDetails = datanodeDetails; - pendingDeleteStatuses = new LinkedList<>(); - } - - public void addPendingDeleteStatus(long dnDeleteTransactionId, - long scmDeleteTransactionId, long containerId) { - pendingDeleteStatuses.add( - new PendingDeleteStatus(dnDeleteTransactionId, scmDeleteTransactionId, - containerId)); - } - - public static class PendingDeleteStatus { - private long dnDeleteTransactionId; - private long scmDeleteTransactionId; - private long containerId; - - public PendingDeleteStatus(long dnDeleteTransactionId, - long scmDeleteTransactionId, long containerId) { - this.dnDeleteTransactionId = dnDeleteTransactionId; - this.scmDeleteTransactionId = scmDeleteTransactionId; - this.containerId = containerId; - } - - public long getDnDeleteTransactionId() { - return dnDeleteTransactionId; - } - - public long getScmDeleteTransactionId() { - return scmDeleteTransactionId; - } - - public long getContainerId() { - return containerId; - } - - } - - public List<PendingDeleteStatus> getPendingDeleteStatuses() { - return pendingDeleteStatuses; - } - - public int getNumPendingDeletes() { - return pendingDeleteStatuses.size(); - } - - public DatanodeDetails getDatanodeDetails() { - return datanodeDetails; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java deleted file mode 100644 index b85d77f..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.block; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.scm.container.Mapping; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.BackgroundService; -import org.apache.hadoop.utils.BackgroundTask; -import org.apache.hadoop.utils.BackgroundTaskQueue; -import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT; - -/** - * A background service running in SCM to delete blocks. This service scans - * block deletion log in certain interval and caches block deletion commands - * in {@link org.apache.hadoop.hdds.scm.node.CommandQueue}, asynchronously - * SCM HB thread polls cached commands and sends them to datanode for physical - * processing. - */ -public class SCMBlockDeletingService extends BackgroundService { - - public static final Logger LOG = - LoggerFactory.getLogger(SCMBlockDeletingService.class); - - // ThreadPoolSize=2, 1 for scheduler and the other for the scanner. - private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2; - private final DeletedBlockLog deletedBlockLog; - private final Mapping mappingService; - private final NodeManager nodeManager; - private final EventPublisher eventPublisher; - - // Block delete limit size is dynamically calculated based on container - // delete limit size (ozone.block.deleting.container.limit.per.interval) - // that configured for datanode. To ensure DN not wait for - // delete commands, we use this value multiply by a factor 2 as the final - // limit TX size for each node. - // Currently we implement a throttle algorithm that throttling delete blocks - // for each datanode. Each node is limited by the calculation size. Firstly - // current node info is fetched from nodemanager, then scan entire delLog - // from the beginning to end. If one node reaches maximum value, its records - // will be skipped. If not, keep scanning until it reaches maximum value. - // Once all node are full, the scan behavior will stop. - private int blockDeleteLimitSize; - - public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog, - Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher, - long interval, long serviceTimeout, Configuration conf) { - super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS, - BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); - this.deletedBlockLog = deletedBlockLog; - this.mappingService = mapper; - this.nodeManager = nodeManager; - this.eventPublisher = eventPublisher; - - int containerLimit = conf.getInt( - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); - Preconditions.checkArgument(containerLimit > 0, - "Container limit size should be " + "positive."); - // Use container limit value multiply by a factor 2 to ensure DN - // not wait for orders. - this.blockDeleteLimitSize = containerLimit * 2; - } - - @Override - public BackgroundTaskQueue getTasks() { - BackgroundTaskQueue queue = new BackgroundTaskQueue(); - queue.add(new DeletedBlockTransactionScanner()); - return queue; - } - - public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) { - DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails(); - for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus : - deletionStatusList.getPendingDeleteStatuses()) { - LOG.info( - "Block deletion txnID mismatch in datanode {} for containerID {}." - + " Datanode delete txnID: {}, SCM txnID: {}", - dnDetails.getUuid(), deletionStatus.getContainerId(), - deletionStatus.getDnDeleteTransactionId(), - deletionStatus.getScmDeleteTransactionId()); - } - } - - private class DeletedBlockTransactionScanner - implements BackgroundTask<EmptyTaskResult> { - - @Override - public int getPriority() { - return 1; - } - - @Override - public EmptyTaskResult call() throws Exception { - int dnTxCount = 0; - long startTime = Time.monotonicNow(); - // Scan SCM DB in HB interval and collect a throttled list of - // to delete blocks. - LOG.debug("Running DeletedBlockTransactionScanner"); - DatanodeDeletedBlockTransactions transactions = null; - List<DatanodeDetails> datanodes = nodeManager.getNodes(NodeState.HEALTHY); - Map<Long, Long> transactionMap = null; - if (datanodes != null) { - transactions = new DatanodeDeletedBlockTransactions(mappingService, - blockDeleteLimitSize, datanodes.size()); - try { - transactionMap = deletedBlockLog.getTransactions(transactions); - } catch (IOException e) { - // We may tolerant a number of failures for sometime - // but if it continues to fail, at some point we need to raise - // an exception and probably fail the SCM ? At present, it simply - // continues to retry the scanning. - LOG.error("Failed to get block deletion transactions from delTX log", - e); - } - LOG.debug("Scanned deleted blocks log and got {} delTX to process.", - transactions.getTXNum()); - } - - if (transactions != null && !transactions.isEmpty()) { - for (UUID dnId : transactions.getDatanodeIDs()) { - List<DeletedBlocksTransaction> dnTXs = transactions - .getDatanodeTransactions(dnId); - if (dnTXs != null && !dnTXs.isEmpty()) { - dnTxCount += dnTXs.size(); - // TODO commandQueue needs a cap. - // We should stop caching new commands if num of un-processed - // command is bigger than a limit, e.g 50. In case datanode goes - // offline for sometime, the cached commands be flooded. - eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND, - new CommandForDatanode<>(dnId, new DeleteBlocksCommand(dnTXs))); - LOG.debug( - "Added delete block command for datanode {} in the queue," - + " number of delete block transactions: {}, TxID list: {}", - dnId, dnTXs.size(), String.join(",", - transactions.getTransactionIDList(dnId))); - } - } - mappingService.updateDeleteTransactionId(transactionMap); - } - - if (dnTxCount > 0) { - LOG.info( - "Totally added {} delete blocks command for" - + " {} datanodes, task elapsed time: {}ms", - dnTxCount, transactions.getDatanodeIDs().size(), - Time.monotonicNow() - startTime); - } - - return EmptyTaskResult.newResult(); - } - } - - @VisibleForTesting - public void setBlockDeleteTXNum(int numTXs) { - blockDeleteLimitSize = numTXs; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java deleted file mode 100644 index e1bfdff..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.block; -/** - * This package contains routines to manage the block location and - * mapping inside SCM - */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java deleted file mode 100644 index c0de382..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.command; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .CommandStatusReportFromDatanode; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; - -/** - * Handles CommandStatusReports from datanode. - */ -public class CommandStatusReportHandler implements - EventHandler<CommandStatusReportFromDatanode> { - - private static final Logger LOGGER = LoggerFactory - .getLogger(CommandStatusReportHandler.class); - - @Override - public void onMessage(CommandStatusReportFromDatanode report, - EventPublisher publisher) { - Preconditions.checkNotNull(report); - List<CommandStatus> cmdStatusList = report.getReport().getCmdStatusList(); - Preconditions.checkNotNull(cmdStatusList); - LOGGER.trace("Processing command status report for dn: {}", report - .getDatanodeDetails()); - - // Route command status to its watchers. - cmdStatusList.forEach(cmdStatus -> { - LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus - .getCmdId(), cmdStatus.getType()); - switch (cmdStatus.getType()) { - case replicateContainerCommand: - publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new - ReplicationStatus(cmdStatus)); - break; - case closeContainerCommand: - publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new - CloseContainerStatus(cmdStatus)); - break; - case deleteBlocksCommand: - if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) { - publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS, - new DeleteBlockStatus(cmdStatus)); - } - break; - default: - LOGGER.debug("CommandStatus of type:{} not handled in " + - "CommandStatusReportHandler.", cmdStatus.getType()); - break; - } - }); - } - - /** - * Wrapper event for CommandStatus. - */ - public static class CommandStatusEvent implements IdentifiableEventPayload { - private CommandStatus cmdStatus; - - CommandStatusEvent(CommandStatus cmdStatus) { - this.cmdStatus = cmdStatus; - } - - public CommandStatus getCmdStatus() { - return cmdStatus; - } - - @Override - public String toString() { - return "CommandStatusEvent:" + cmdStatus.toString(); - } - - @Override - public long getId() { - return cmdStatus.getCmdId(); - } - } - - /** - * Wrapper event for Replicate Command. - */ - public static class ReplicationStatus extends CommandStatusEvent { - public ReplicationStatus(CommandStatus cmdStatus) { - super(cmdStatus); - } - } - - /** - * Wrapper event for CloseContainer Command. - */ - public static class CloseContainerStatus extends CommandStatusEvent { - public CloseContainerStatus(CommandStatus cmdStatus) { - super(cmdStatus); - } - } - - /** - * Wrapper event for DeleteBlock Command. - */ - public static class DeleteBlockStatus extends CommandStatusEvent { - public DeleteBlockStatus(CommandStatus cmdStatus) { - super(cmdStatus); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java deleted file mode 100644 index ba17fb9..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * <p> - * This package contains HDDS protocol related classes. - */ - -/** - * This package contains HDDS protocol related classes. - */ -package org.apache.hadoop.hdds.scm.command; -/* - * Classes related to commands issued from SCM to DataNode. - * */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java deleted file mode 100644 index 7baecc4..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import java.io.IOException; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; -import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ; - -/** - * In case of a node failure, volume failure, volume out of spapce, node - * out of space etc, CLOSE_CONTAINER will be triggered. - * CloseContainerEventHandler is the handler for CLOSE_CONTAINER. - * When a close container event is fired, a close command for the container - * should be sent to all the datanodes in the pipeline and containerStateManager - * needs to update the container state to Closing. - */ -public class CloseContainerEventHandler implements EventHandler<ContainerID> { - - public static final Logger LOG = - LoggerFactory.getLogger(CloseContainerEventHandler.class); - - - private final Mapping containerManager; - - public CloseContainerEventHandler(Mapping containerManager) { - this.containerManager = containerManager; - } - - @Override - public void onMessage(ContainerID containerID, EventPublisher publisher) { - - LOG.info("Close container Event triggered for container : {}", - containerID.getId()); - ContainerWithPipeline containerWithPipeline; - ContainerInfo info; - try { - containerWithPipeline = - containerManager.getContainerWithPipeline(containerID.getId()); - info = containerWithPipeline.getContainerInfo(); - if (info == null) { - LOG.error("Failed to update the container state. Container with id : {}" - + " does not exist", containerID.getId()); - return; - } - } catch (IOException e) { - LOG.error("Failed to update the container state. Container with id : {} " - + "does not exist", containerID.getId(), e); - return; - } - - HddsProtos.LifeCycleState state = info.getState(); - try { - switch (state) { - case ALLOCATED: - // We cannot close a container in ALLOCATED state, moving the - // container to CREATING state, this should eventually - // timeout and the container will be moved to DELETING state. - LOG.debug("Closing container {} in {} state", containerID, state); - containerManager.updateContainerState(containerID.getId(), - HddsProtos.LifeCycleEvent.CREATE); - break; - case CREATING: - // We cannot close a container in CREATING state, it will eventually - // timeout and moved to DELETING state. - LOG.debug("Closing container {} in {} state", containerID, state); - break; - case OPEN: - containerManager.updateContainerState(containerID.getId(), - HddsProtos.LifeCycleEvent.FINALIZE); - fireCloseContainerEvents(containerWithPipeline, info, publisher); - break; - case CLOSING: - fireCloseContainerEvents(containerWithPipeline, info, publisher); - break; - case CLOSED: - case DELETING: - case DELETED: - LOG.info( - "container with id : {} is in {} state and need not be closed.", - containerID.getId(), info.getState()); - break; - default: - throw new IOException( - "Invalid container state for container " + containerID); - } - } catch (IOException ex) { - LOG.error("Failed to update the container state for" + "container : {}" - + containerID, ex); - } - } - - private void fireCloseContainerEvents( - ContainerWithPipeline containerWithPipeline, ContainerInfo info, - EventPublisher publisher) { - ContainerID containerID = info.containerID(); - // fire events. - CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(containerID.getId(), - info.getReplicationType(), info.getPipelineID()); - - Pipeline pipeline = containerWithPipeline.getPipeline(); - pipeline.getMachines().stream().map( - datanode -> new CommandForDatanode<>(datanode.getUuid(), - closeContainerCommand)).forEach((command) -> { - publisher.fireEvent(DATANODE_COMMAND, command); - }); - publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, - new CloseContainerRetryableReq(containerID)); - LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand, - pipeline, containerID); - } - - /** - * Class to create retryable event. Prevents redundant requests for same - * container Id. - */ - public static class CloseContainerRetryableReq implements - IdentifiableEventPayload { - - private ContainerID containerID; - public CloseContainerRetryableReq(ContainerID containerID) { - this.containerID = containerID; - } - - public ContainerID getContainerID() { - return containerID; - } - - @Override - public long getId() { - return containerID.getId(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java deleted file mode 100644 index 8e277b9..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * <p>http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * <p>Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler - .CloseContainerStatus; - -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.server.events.Event; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.server.events.EventWatcher; -import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler - .CloseContainerRetryableReq; -import org.apache.hadoop.ozone.lease.LeaseManager; -import org.apache.hadoop.ozone.lease.LeaseNotFoundException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from - * CommandStatusReport. If required it will re-trigger CloseContainer command - * for DataNodes to CloseContainerEventHandler. - */ -public class CloseContainerWatcher extends - EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> { - - public static final Logger LOG = - LoggerFactory.getLogger(CloseContainerWatcher.class); - private final Mapping containerManager; - - public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent, - Event<CloseContainerStatus> completionEvent, - LeaseManager<Long> leaseManager, Mapping containerManager) { - super(startEvent, completionEvent, leaseManager); - this.containerManager = containerManager; - } - - @Override - protected void onTimeout(EventPublisher publisher, - CloseContainerRetryableReq payload) { - // Let CloseContainerEventHandler handle this message. - this.resendEventToHandler(payload.getId(), publisher); - } - - @Override - protected void onFinished(EventPublisher publisher, - CloseContainerRetryableReq payload) { - LOG.trace("CloseContainerCommand for containerId: {} executed ", payload - .getContainerID().getId()); - } - - @Override - protected synchronized void handleCompletion(CloseContainerStatus status, - EventPublisher publisher) throws LeaseNotFoundException { - // If status is PENDING then return without doing anything. - if(status.getCmdStatus().getStatus().equals(Status.PENDING)){ - return; - } - - CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId()); - super.handleCompletion(status, publisher); - // If status is FAILED then send a msg to Handler to resend the command. - if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont - != null) { - this.resendEventToHandler(closeCont.getId(), publisher); - } - } - - private void resendEventToHandler(long containerID, EventPublisher - publisher) { - try { - // Check if container is still open - if (containerManager.getContainer(containerID).isContainerOpen()) { - publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, - ContainerID.valueof(containerID)); - } - } catch (IOException e) { - LOG.warn("Error in CloseContainerWatcher while processing event " + - "for containerId {} ExceptionMsg: ", containerID, e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java deleted file mode 100644 index ce399eb..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerAction; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ContainerActionsFromDatanode; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles container reports from datanode. - */ -public class ContainerActionsHandler implements - EventHandler<ContainerActionsFromDatanode> { - - private static final Logger LOG = LoggerFactory.getLogger( - ContainerActionsHandler.class); - - @Override - public void onMessage( - ContainerActionsFromDatanode containerReportFromDatanode, - EventPublisher publisher) { - DatanodeDetails dd = containerReportFromDatanode.getDatanodeDetails(); - for (ContainerAction action : containerReportFromDatanode.getReport() - .getContainerActionsList()) { - ContainerID containerId = ContainerID.valueof(action.getContainerID()); - switch (action.getAction()) { - case CLOSE: - LOG.debug("Closing container {} in datanode {} because the" + - " container is {}.", containerId, dd, action.getReason()); - publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerId); - break; - default: - LOG.warn("Invalid action {} with reason {}, from datanode {}. ", - action.getAction(), action.getReason(), dd); } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org