http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
deleted file mode 100644
index 23c6983..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.block;
-
-
-/**
- * JMX interface for the block manager.
- */
-public interface BlockmanagerMXBean {
-
-  /**
-   * Number of open containers manager by the block manager.
-   */
-  int getOpenContainersNo();
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
deleted file mode 100644
index 8702a42..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.block;
-
-import com.google.common.collect.ArrayListMultimap;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-
-/**
- * A wrapper class to hold info about datanode and all deleted block
- * transactions that will be sent to this datanode.
- */
-public class DatanodeDeletedBlockTransactions {
-  private int nodeNum;
-  // The throttle size for each datanode.
-  private int maximumAllowedTXNum;
-  // Current counter of inserted TX.
-  private int currentTXNum;
-  private Mapping mappingService;
-  // A list of TXs mapped to a certain datanode ID.
-  private final ArrayListMultimap<UUID, DeletedBlocksTransaction>
-      transactions;
-
-  DatanodeDeletedBlockTransactions(Mapping mappingService,
-      int maximumAllowedTXNum, int nodeNum) {
-    this.transactions = ArrayListMultimap.create();
-    this.mappingService = mappingService;
-    this.maximumAllowedTXNum = maximumAllowedTXNum;
-    this.nodeNum = nodeNum;
-  }
-
-  public boolean addTransaction(DeletedBlocksTransaction tx,
-      Set<UUID> dnsWithTransactionCommitted) {
-    Pipeline pipeline = null;
-    try {
-      ContainerWithPipeline containerWithPipeline =
-          mappingService.getContainerWithPipeline(tx.getContainerID());
-      if (containerWithPipeline.getContainerInfo().isContainerOpen()
-          || containerWithPipeline.getPipeline().isEmpty()) {
-        return false;
-      }
-      pipeline = containerWithPipeline.getPipeline();
-    } catch (IOException e) {
-      SCMBlockDeletingService.LOG.warn("Got container info error.", e);
-      return false;
-    }
-
-    boolean success = false;
-    for (DatanodeDetails dd : pipeline.getMachines()) {
-      UUID dnID = dd.getUuid();
-      if (dnsWithTransactionCommitted == null ||
-          !dnsWithTransactionCommitted.contains(dnID)) {
-        // Transaction need not be sent to dns which have already committed it
-        success = addTransactionToDN(dnID, tx);
-      }
-    }
-    return success;
-  }
-
-  private boolean addTransactionToDN(UUID dnID, DeletedBlocksTransaction tx) {
-    if (transactions.containsKey(dnID)) {
-      List<DeletedBlocksTransaction> txs = transactions.get(dnID);
-      if (txs != null && txs.size() < maximumAllowedTXNum) {
-        boolean hasContained = false;
-        for (DeletedBlocksTransaction t : txs) {
-          if (t.getContainerID() == tx.getContainerID()) {
-            hasContained = true;
-            break;
-          }
-        }
-
-        if (!hasContained) {
-          txs.add(tx);
-          currentTXNum++;
-          return true;
-        }
-      }
-    } else {
-      currentTXNum++;
-      transactions.put(dnID, tx);
-      return true;
-    }
-    SCMBlockDeletingService.LOG
-        .debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
-    return false;
-  }
-
-  Set<UUID> getDatanodeIDs() {
-    return transactions.keySet();
-  }
-
-  boolean isEmpty() {
-    return transactions.isEmpty();
-  }
-
-  boolean hasTransactions(UUID dnId) {
-    return transactions.containsKey(dnId) &&
-        !transactions.get(dnId).isEmpty();
-  }
-
-  List<DeletedBlocksTransaction> getDatanodeTransactions(UUID dnId) {
-    return transactions.get(dnId);
-  }
-
-  List<String> getTransactionIDList(UUID dnId) {
-    if (hasTransactions(dnId)) {
-      return transactions.get(dnId).stream()
-          .map(DeletedBlocksTransaction::getTxID).map(String::valueOf)
-          .collect(Collectors.toList());
-    } else {
-      return Collections.emptyList();
-    }
-  }
-
-  boolean isFull() {
-    return currentTXNum >= maximumAllowedTXNum * nodeNum;
-  }
-
-  int getTXNum() {
-    return currentTXNum;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
deleted file mode 100644
index db6c1c5..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.block;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
-    .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- * The DeletedBlockLog is a persisted log in SCM to keep tracking
- * container blocks which are under deletion. It maintains info
- * about under-deletion container blocks that notified by OM,
- * and the state how it is processed.
- */
-public interface DeletedBlockLog extends Closeable {
-
-  /**
-   * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
-   * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
-   * stop.
-   * @param transactions a list of TXs will be set into.
-   * @return Mapping from containerId to latest transactionId for the 
container.
-   * @throws IOException
-   */
-  Map<Long, Long> getTransactions(DatanodeDeletedBlockTransactions 
transactions)
-      throws IOException;
-
-  /**
-   * Return all failed transactions in the log. A transaction is considered
-   * to be failed if it has been sent more than MAX_RETRY limit and its
-   * count is reset to -1.
-   *
-   * @return a list of failed deleted block transactions.
-   * @throws IOException
-   */
-  List<DeletedBlocksTransaction> getFailedTransactions()
-      throws IOException;
-
-  /**
-   * Increments count for given list of transactions by 1.
-   * The log maintains a valid range of counts for each transaction
-   * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
-   * the transaction is no longer valid.
-   *
-   * @param txIDs - transaction ID.
-   */
-  void incrementCount(List<Long> txIDs)
-      throws IOException;
-
-  /**
-   * Commits a transaction means to delete all footprints of a transaction
-   * from the log. This method doesn't guarantee all transactions can be
-   * successfully deleted, it tolerate failures and tries best efforts to.
-   *  @param transactionResults - delete block transaction results.
-   * @param dnID - ID of datanode which acknowledges the delete block command.
-   */
-  void commitTransactions(List<DeleteBlockTransactionResult> 
transactionResults,
-      UUID dnID);
-
-  /**
-   * Creates a block deletion transaction and adds that into the log.
-   *
-   * @param containerID - container ID.
-   * @param blocks - blocks that belong to the same container.
-   *
-   * @throws IOException
-   */
-  void addTransaction(long containerID, List<Long> blocks)
-      throws IOException;
-
-  /**
-   * Creates block deletion transactions for a set of containers,
-   * add into the log and persist them atomically. An object key
-   * might be stored in multiple containers and multiple blocks,
-   * this API ensures that these updates are done in atomic manner
-   * so if any of them fails, the entire operation fails without
-   * any updates to the log. Note, this doesn't mean to create only
-   * one transaction, it creates multiple transactions (depends on the
-   * number of containers) together (on success) or non (on failure).
-   *
-   * @param containerBlocksMap a map of containerBlocks.
-   * @throws IOException
-   */
-  void addTransactions(Map<Long, List<Long>> containerBlocksMap)
-      throws IOException;
-
-  /**
-   * Returns the total number of valid transactions. A transaction is
-   * considered to be valid as long as its count is in range [0, MAX_RETRY].
-   *
-   * @return number of a valid transactions.
-   * @throws IOException
-   */
-  int getNumOfValidTransactions() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
deleted file mode 100644
index 68435d1..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.block;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
-    .DeleteBlockTransactionResult;
-import org.apache.hadoop.hdds.scm.command
-    .CommandStatusReportHandler.DeleteBlockStatus;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.eclipse.jetty.util.ConcurrentHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import static java.lang.Math.min;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.DELETED_BLOCK_DB;
-
-/**
- * A implement class of {@link DeletedBlockLog}, and it uses
- * K/V db to maintain block deletion transactions between scm and datanode.
- * This is a very basic implementation, it simply scans the log and
- * memorize the position that scanned by last time, and uses this to
- * determine where the next scan starts. It has no notion about weight
- * of each transaction so as long as transaction is still valid, they get
- * equally same chance to be retrieved which only depends on the nature
- * order of the transaction ID.
- */
-public class DeletedBlockLogImpl
-    implements DeletedBlockLog, EventHandler<DeleteBlockStatus> {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(DeletedBlockLogImpl.class);
-
-  private static final byte[] LATEST_TXID =
-      DFSUtil.string2Bytes("#LATEST_TXID#");
-
-  private final int maxRetry;
-  private final MetadataStore deletedStore;
-  private final Mapping containerManager;
-  private final Lock lock;
-  // The latest id of deleted blocks in the db.
-  private long lastTxID;
-  // Maps txId to set of DNs which are successful in committing the transaction
-  private Map<Long, Set<UUID>> transactionToDNsCommitMap;
-
-  public DeletedBlockLogImpl(Configuration conf, Mapping containerManager)
-      throws IOException {
-    maxRetry = conf.getInt(OZONE_SCM_BLOCK_DELETION_MAX_RETRY,
-        OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT);
-
-    File metaDir = getOzoneMetaDirPath(conf);
-    String scmMetaDataDir = metaDir.getPath();
-    File deletedLogDbPath = new File(scmMetaDataDir, DELETED_BLOCK_DB);
-    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-    // Load store of all transactions.
-    deletedStore = MetadataStoreBuilder.newBuilder()
-        .setCreateIfMissing(true)
-        .setConf(conf)
-        .setDbFile(deletedLogDbPath)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
-    this.containerManager = containerManager;
-
-    this.lock = new ReentrantLock();
-    // start from the head of deleted store.
-    lastTxID = findLatestTxIDInStore();
-
-    // transactionToDNsCommitMap is updated only when
-    // transaction is added to the log and when it is removed.
-
-    // maps transaction to dns which have committed it.
-    transactionToDNsCommitMap = new ConcurrentHashMap<>();
-  }
-
-  @VisibleForTesting
-  public MetadataStore getDeletedStore() {
-    return deletedStore;
-  }
-
-  /**
-   * There is no need to lock before reading because
-   * it's only used in construct method.
-   *
-   * @return latest txid.
-   * @throws IOException
-   */
-  private long findLatestTxIDInStore() throws IOException {
-    long txid = 0;
-    byte[] value = deletedStore.get(LATEST_TXID);
-    if (value != null) {
-      txid = Longs.fromByteArray(value);
-    }
-    return txid;
-  }
-
-  @Override
-  public List<DeletedBlocksTransaction> getFailedTransactions()
-      throws IOException {
-    lock.lock();
-    try {
-      final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
-      deletedStore.iterate(null, (key, value) -> {
-        if (!Arrays.equals(LATEST_TXID, key)) {
-          DeletedBlocksTransaction delTX =
-              DeletedBlocksTransaction.parseFrom(value);
-          if (delTX.getCount() == -1) {
-            failedTXs.add(delTX);
-          }
-        }
-        return true;
-      });
-      return failedTXs;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param txIDs - transaction ID.
-   * @throws IOException
-   */
-  @Override
-  public void incrementCount(List<Long> txIDs) throws IOException {
-    BatchOperation batch = new BatchOperation();
-    lock.lock();
-    try {
-      for(Long txID : txIDs) {
-        try {
-          byte[] deleteBlockBytes =
-              deletedStore.get(Longs.toByteArray(txID));
-          if (deleteBlockBytes == null) {
-            LOG.warn("Delete txID {} not found", txID);
-            continue;
-          }
-          DeletedBlocksTransaction block = DeletedBlocksTransaction
-              .parseFrom(deleteBlockBytes);
-          DeletedBlocksTransaction.Builder builder = block.toBuilder();
-          int currentCount = block.getCount();
-          if (currentCount > -1) {
-            builder.setCount(++currentCount);
-          }
-          // if the retry time exceeds the maxRetry value
-          // then set the retry value to -1, stop retrying, admins can
-          // analyze those blocks and purge them manually by SCMCli.
-          if (currentCount > maxRetry) {
-            builder.setCount(-1);
-          }
-          deletedStore.put(Longs.toByteArray(txID),
-              builder.build().toByteArray());
-        } catch (IOException ex) {
-          LOG.warn("Cannot increase count for txID " + txID, ex);
-        }
-      }
-      deletedStore.writeBatch(batch);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private DeletedBlocksTransaction constructNewTransaction(long txID,
-      long containerID, List<Long> blocks) {
-    return DeletedBlocksTransaction.newBuilder()
-        .setTxID(txID)
-        .setContainerID(containerID)
-        .addAllLocalID(blocks)
-        .setCount(0)
-        .build();
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param transactionResults - transaction IDs.
-   * @param dnID - Id of Datanode which has acknowledged a delete block 
command.
-   * @throws IOException
-   */
-  @Override
-  public void commitTransactions(
-      List<DeleteBlockTransactionResult> transactionResults, UUID dnID) {
-    lock.lock();
-    try {
-      Set<UUID> dnsWithCommittedTxn;
-      for (DeleteBlockTransactionResult transactionResult :
-          transactionResults) {
-        if (isTransactionFailed(transactionResult)) {
-          continue;
-        }
-        try {
-          long txID = transactionResult.getTxID();
-          // set of dns which have successfully committed transaction txId.
-          dnsWithCommittedTxn = transactionToDNsCommitMap.get(txID);
-          Long containerId = transactionResult.getContainerID();
-          if (dnsWithCommittedTxn == null) {
-            LOG.warn("Transaction txId={} commit by dnId={} for containerID={} 
"
-                    + "failed. Corresponding entry not found.", txID, dnID,
-                containerId);
-            return;
-          }
-
-          dnsWithCommittedTxn.add(dnID);
-          Pipeline pipeline =
-              containerManager.getContainerWithPipeline(containerId)
-                  .getPipeline();
-          Collection<DatanodeDetails> containerDnsDetails =
-              pipeline.getDatanodes().values();
-          // The delete entry can be safely removed from the log if all the
-          // corresponding nodes commit the txn. It is required to check that
-          // the nodes returned in the pipeline match the replication factor.
-          if (min(containerDnsDetails.size(), dnsWithCommittedTxn.size())
-              >= pipeline.getFactor().getNumber()) {
-            List<UUID> containerDns = containerDnsDetails.stream()
-                .map(DatanodeDetails::getUuid)
-                .collect(Collectors.toList());
-            if (dnsWithCommittedTxn.containsAll(containerDns)) {
-              transactionToDNsCommitMap.remove(txID);
-              LOG.debug("Purging txId={} from block deletion log", txID);
-              deletedStore.delete(Longs.toByteArray(txID));
-            }
-          }
-          LOG.debug("Datanode txId={} containerId={} committed by dnId={}",
-              txID, containerId, dnID);
-        } catch (IOException e) {
-          LOG.warn("Could not commit delete block transaction: " +
-              transactionResult.getTxID(), e);
-        }
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private boolean isTransactionFailed(DeleteBlockTransactionResult result) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(
-          "Got block deletion ACK from datanode, TXIDs={}, " + "success={}",
-          result.getTxID(), result.getSuccess());
-    }
-    if (!result.getSuccess()) {
-      LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
-          + "TX in next interval", result.getTxID());
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param containerID - container ID.
-   * @param blocks - blocks that belong to the same container.
-   * @throws IOException
-   */
-  @Override
-  public void addTransaction(long containerID, List<Long> blocks)
-      throws IOException {
-    BatchOperation batch = new BatchOperation();
-    lock.lock();
-    try {
-      DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
-          containerID, blocks);
-      byte[] key = Longs.toByteArray(lastTxID + 1);
-
-      batch.put(key, tx.toByteArray());
-      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID + 1));
-
-      deletedStore.writeBatch(batch);
-      lastTxID += 1;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public int getNumOfValidTransactions() throws IOException {
-    lock.lock();
-    try {
-      final AtomicInteger num = new AtomicInteger(0);
-      deletedStore.iterate(null, (key, value) -> {
-        // Exclude latest txid record
-        if (!Arrays.equals(LATEST_TXID, key)) {
-          DeletedBlocksTransaction delTX =
-              DeletedBlocksTransaction.parseFrom(value);
-          if (delTX.getCount() > -1) {
-            num.incrementAndGet();
-          }
-        }
-        return true;
-      });
-      return num.get();
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @param containerBlocksMap a map of containerBlocks.
-   * @throws IOException
-   */
-  @Override
-  public void addTransactions(
-      Map<Long, List<Long>> containerBlocksMap)
-      throws IOException {
-    BatchOperation batch = new BatchOperation();
-    lock.lock();
-    try {
-      long currentLatestID = lastTxID;
-      for (Map.Entry<Long, List<Long>> entry :
-          containerBlocksMap.entrySet()) {
-        currentLatestID += 1;
-        byte[] key = Longs.toByteArray(currentLatestID);
-        DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
-            entry.getKey(), entry.getValue());
-        batch.put(key, tx.toByteArray());
-      }
-      lastTxID = currentLatestID;
-      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
-      deletedStore.writeBatch(batch);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (deletedStore != null) {
-      deletedStore.close();
-    }
-  }
-
-  @Override
-  public Map<Long, Long> getTransactions(
-      DatanodeDeletedBlockTransactions transactions) throws IOException {
-    lock.lock();
-    try {
-      Map<Long, Long> deleteTransactionMap = new HashMap<>();
-      deletedStore.iterate(null, (key, value) -> {
-        if (!Arrays.equals(LATEST_TXID, key)) {
-          DeletedBlocksTransaction block = DeletedBlocksTransaction
-              .parseFrom(value);
-
-          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
-            if (transactions.addTransaction(block,
-                transactionToDNsCommitMap.get(block.getTxID()))) {
-              deleteTransactionMap.put(block.getContainerID(), 
block.getTxID());
-              transactionToDNsCommitMap
-                  .putIfAbsent(block.getTxID(), new ConcurrentHashSet<>());
-            }
-          }
-          return !transactions.isFull();
-        }
-        return true;
-      });
-      return deleteTransactionMap;
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void onMessage(DeleteBlockStatus deleteBlockStatus,
-      EventPublisher publisher) {
-    ContainerBlocksDeletionACKProto ackProto =
-        deleteBlockStatus.getCmdStatus().getBlockDeletionAck();
-    commitTransactions(ackProto.getResultsList(),
-        UUID.fromString(ackProto.getDnId()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
deleted file mode 100644
index 736daac..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteHandler.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.block;
-
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-
-public class PendingDeleteHandler implements
-    EventHandler<PendingDeleteStatusList> {
-
-  private SCMBlockDeletingService scmBlockDeletingService;
-
-  public PendingDeleteHandler(
-      SCMBlockDeletingService scmBlockDeletingService) {
-    this.scmBlockDeletingService = scmBlockDeletingService;
-  }
-
-  @Override
-  public void onMessage(PendingDeleteStatusList pendingDeleteStatusList,
-      EventPublisher publisher) {
-    scmBlockDeletingService.handlePendingDeletes(pendingDeleteStatusList);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
deleted file mode 100644
index 904762d..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/PendingDeleteStatusList.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.block;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public class PendingDeleteStatusList {
-
-  private List<PendingDeleteStatus> pendingDeleteStatuses;
-  private DatanodeDetails datanodeDetails;
-
-  public PendingDeleteStatusList(DatanodeDetails datanodeDetails) {
-    this.datanodeDetails = datanodeDetails;
-    pendingDeleteStatuses = new LinkedList<>();
-  }
-
-  public void addPendingDeleteStatus(long dnDeleteTransactionId,
-      long scmDeleteTransactionId, long containerId) {
-    pendingDeleteStatuses.add(
-        new PendingDeleteStatus(dnDeleteTransactionId, scmDeleteTransactionId,
-            containerId));
-  }
-
-  public static class PendingDeleteStatus {
-    private long dnDeleteTransactionId;
-    private long scmDeleteTransactionId;
-    private long containerId;
-
-    public PendingDeleteStatus(long dnDeleteTransactionId,
-        long scmDeleteTransactionId, long containerId) {
-      this.dnDeleteTransactionId = dnDeleteTransactionId;
-      this.scmDeleteTransactionId = scmDeleteTransactionId;
-      this.containerId = containerId;
-    }
-
-    public long getDnDeleteTransactionId() {
-      return dnDeleteTransactionId;
-    }
-
-    public long getScmDeleteTransactionId() {
-      return scmDeleteTransactionId;
-    }
-
-    public long getContainerId() {
-      return containerId;
-    }
-
-  }
-
-  public List<PendingDeleteStatus> getPendingDeleteStatuses() {
-    return pendingDeleteStatuses;
-  }
-
-  public int getNumPendingDeletes() {
-    return pendingDeleteStatuses.size();
-  }
-
-  public DatanodeDetails getDatanodeDetails() {
-    return datanodeDetails;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
deleted file mode 100644
index b85d77f..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.block;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.Mapping;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
-
-/**
- * A background service running in SCM to delete blocks. This service scans
- * block deletion log in certain interval and caches block deletion commands
- * in {@link org.apache.hadoop.hdds.scm.node.CommandQueue}, asynchronously
- * SCM HB thread polls cached commands and sends them to datanode for physical
- * processing.
- */
-public class SCMBlockDeletingService extends BackgroundService {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(SCMBlockDeletingService.class);
-
-  // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
-  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
-  private final DeletedBlockLog deletedBlockLog;
-  private final Mapping mappingService;
-  private final NodeManager nodeManager;
-  private final EventPublisher eventPublisher;
-
-  // Block delete limit size is dynamically calculated based on container
-  // delete limit size (ozone.block.deleting.container.limit.per.interval)
-  // that configured for datanode. To ensure DN not wait for
-  // delete commands, we use this value multiply by a factor 2 as the final
-  // limit TX size for each node.
-  // Currently we implement a throttle algorithm that throttling delete blocks
-  // for each datanode. Each node is limited by the calculation size. Firstly
-  // current node info is fetched from nodemanager, then scan entire delLog
-  // from the beginning to end. If one node reaches maximum value, its records
-  // will be skipped. If not, keep scanning until it reaches maximum value.
-  // Once all node are full, the scan behavior will stop.
-  private int blockDeleteLimitSize;
-
-  public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
-      Mapping mapper, NodeManager nodeManager, EventPublisher eventPublisher,
-      long interval, long serviceTimeout, Configuration conf) {
-    super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
-        BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
-    this.deletedBlockLog = deletedBlockLog;
-    this.mappingService = mapper;
-    this.nodeManager = nodeManager;
-    this.eventPublisher = eventPublisher;
-
-    int containerLimit = conf.getInt(
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
-        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
-    Preconditions.checkArgument(containerLimit > 0,
-        "Container limit size should be " + "positive.");
-    // Use container limit value multiply by a factor 2 to ensure DN
-    // not wait for orders.
-    this.blockDeleteLimitSize = containerLimit * 2;
-  }
-
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    queue.add(new DeletedBlockTransactionScanner());
-    return queue;
-  }
-
-  public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) 
{
-    DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
-    for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus :
-        deletionStatusList.getPendingDeleteStatuses()) {
-      LOG.info(
-          "Block deletion txnID mismatch in datanode {} for containerID {}."
-              + " Datanode delete txnID: {}, SCM txnID: {}",
-          dnDetails.getUuid(), deletionStatus.getContainerId(),
-          deletionStatus.getDnDeleteTransactionId(),
-          deletionStatus.getScmDeleteTransactionId());
-    }
-  }
-
-  private class DeletedBlockTransactionScanner
-      implements BackgroundTask<EmptyTaskResult> {
-
-    @Override
-    public int getPriority() {
-      return 1;
-    }
-
-    @Override
-    public EmptyTaskResult call() throws Exception {
-      int dnTxCount = 0;
-      long startTime = Time.monotonicNow();
-      // Scan SCM DB in HB interval and collect a throttled list of
-      // to delete blocks.
-      LOG.debug("Running DeletedBlockTransactionScanner");
-      DatanodeDeletedBlockTransactions transactions = null;
-      List<DatanodeDetails> datanodes = 
nodeManager.getNodes(NodeState.HEALTHY);
-      Map<Long, Long> transactionMap = null;
-      if (datanodes != null) {
-        transactions = new DatanodeDeletedBlockTransactions(mappingService,
-            blockDeleteLimitSize, datanodes.size());
-        try {
-          transactionMap = deletedBlockLog.getTransactions(transactions);
-        } catch (IOException e) {
-          // We may tolerant a number of failures for sometime
-          // but if it continues to fail, at some point we need to raise
-          // an exception and probably fail the SCM ? At present, it simply
-          // continues to retry the scanning.
-          LOG.error("Failed to get block deletion transactions from delTX log",
-              e);
-        }
-        LOG.debug("Scanned deleted blocks log and got {} delTX to process.",
-            transactions.getTXNum());
-      }
-
-      if (transactions != null && !transactions.isEmpty()) {
-        for (UUID dnId : transactions.getDatanodeIDs()) {
-          List<DeletedBlocksTransaction> dnTXs = transactions
-              .getDatanodeTransactions(dnId);
-          if (dnTXs != null && !dnTXs.isEmpty()) {
-            dnTxCount += dnTXs.size();
-            // TODO commandQueue needs a cap.
-            // We should stop caching new commands if num of un-processed
-            // command is bigger than a limit, e.g 50. In case datanode goes
-            // offline for sometime, the cached commands be flooded.
-            eventPublisher.fireEvent(SCMEvents.RETRIABLE_DATANODE_COMMAND,
-                new CommandForDatanode<>(dnId, new 
DeleteBlocksCommand(dnTXs)));
-            LOG.debug(
-                "Added delete block command for datanode {} in the queue,"
-                    + " number of delete block transactions: {}, TxID list: 
{}",
-                dnId, dnTXs.size(), String.join(",",
-                    transactions.getTransactionIDList(dnId)));
-          }
-        }
-        mappingService.updateDeleteTransactionId(transactionMap);
-      }
-
-      if (dnTxCount > 0) {
-        LOG.info(
-            "Totally added {} delete blocks command for"
-                + " {} datanodes, task elapsed time: {}ms",
-            dnTxCount, transactions.getDatanodeIDs().size(),
-            Time.monotonicNow() - startTime);
-      }
-
-      return EmptyTaskResult.newResult();
-    }
-  }
-
-  @VisibleForTesting
-  public void setBlockDeleteTXNum(int numTXs) {
-    blockDeleteLimitSize = numTXs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
deleted file mode 100644
index e1bfdff..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.block;
-/**
- * This package contains routines to manage the block location and
- * mapping inside SCM
- */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
deleted file mode 100644
index c0de382..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.command;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .CommandStatusReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Handles CommandStatusReports from datanode.
- */
-public class CommandStatusReportHandler implements
-    EventHandler<CommandStatusReportFromDatanode> {
-
-  private static final Logger LOGGER = LoggerFactory
-      .getLogger(CommandStatusReportHandler.class);
-
-  @Override
-  public void onMessage(CommandStatusReportFromDatanode report,
-      EventPublisher publisher) {
-    Preconditions.checkNotNull(report);
-    List<CommandStatus> cmdStatusList = report.getReport().getCmdStatusList();
-    Preconditions.checkNotNull(cmdStatusList);
-    LOGGER.trace("Processing command status report for dn: {}", report
-        .getDatanodeDetails());
-
-    // Route command status to its watchers.
-    cmdStatusList.forEach(cmdStatus -> {
-      LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
-          .getCmdId(), cmdStatus.getType());
-      switch (cmdStatus.getType()) {
-      case replicateContainerCommand:
-        publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
-            ReplicationStatus(cmdStatus));
-        break;
-      case closeContainerCommand:
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
-            CloseContainerStatus(cmdStatus));
-        break;
-      case deleteBlocksCommand:
-        if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
-          publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
-              new DeleteBlockStatus(cmdStatus));
-        }
-        break;
-      default:
-        LOGGER.debug("CommandStatus of type:{} not handled in " +
-            "CommandStatusReportHandler.", cmdStatus.getType());
-        break;
-      }
-    });
-  }
-
-  /**
-   * Wrapper event for CommandStatus.
-   */
-  public static class CommandStatusEvent implements IdentifiableEventPayload {
-    private CommandStatus cmdStatus;
-
-    CommandStatusEvent(CommandStatus cmdStatus) {
-      this.cmdStatus = cmdStatus;
-    }
-
-    public CommandStatus getCmdStatus() {
-      return cmdStatus;
-    }
-
-    @Override
-    public String toString() {
-      return "CommandStatusEvent:" + cmdStatus.toString();
-    }
-
-    @Override
-    public long getId() {
-      return cmdStatus.getCmdId();
-    }
-  }
-
-  /**
-   * Wrapper event for Replicate Command.
-   */
-  public static class ReplicationStatus extends CommandStatusEvent {
-    public ReplicationStatus(CommandStatus cmdStatus) {
-      super(cmdStatus);
-    }
-  }
-
-  /**
-   * Wrapper event for CloseContainer Command.
-   */
-  public static class CloseContainerStatus extends CommandStatusEvent {
-    public CloseContainerStatus(CommandStatus cmdStatus) {
-      super(cmdStatus);
-    }
-  }
-
-  /**
-   * Wrapper event for DeleteBlock Command.
-   */
-  public static class DeleteBlockStatus extends CommandStatusEvent {
-    public DeleteBlockStatus(CommandStatus cmdStatus) {
-      super(cmdStatus);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
deleted file mode 100644
index ba17fb9..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- * <p>
- * This package contains HDDS protocol related classes.
- */
-
-/**
- * This package contains HDDS protocol related classes.
- */
-package org.apache.hadoop.hdds.scm.command;
-/*
- * Classes related to commands issued from SCM to DataNode.
- * */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
deleted file mode 100644
index 7baecc4..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import java.io.IOException;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
-import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
-import static 
org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
-
-/**
- * In case of a node failure, volume failure, volume out of spapce, node
- * out of space etc, CLOSE_CONTAINER will be triggered.
- * CloseContainerEventHandler is the handler for CLOSE_CONTAINER.
- * When a close container event is fired, a close command for the container
- * should be sent to all the datanodes in the pipeline and 
containerStateManager
- * needs to update the container state to Closing.
- */
-public class CloseContainerEventHandler implements EventHandler<ContainerID> {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerEventHandler.class);
-
-
-  private final Mapping containerManager;
-
-  public CloseContainerEventHandler(Mapping containerManager) {
-    this.containerManager = containerManager;
-  }
-
-  @Override
-  public void onMessage(ContainerID containerID, EventPublisher publisher) {
-
-    LOG.info("Close container Event triggered for container : {}",
-        containerID.getId());
-    ContainerWithPipeline containerWithPipeline;
-    ContainerInfo info;
-    try {
-      containerWithPipeline =
-          containerManager.getContainerWithPipeline(containerID.getId());
-      info = containerWithPipeline.getContainerInfo();
-      if (info == null) {
-        LOG.error("Failed to update the container state. Container with id : 
{}"
-            + " does not exist", containerID.getId());
-        return;
-      }
-    } catch (IOException e) {
-      LOG.error("Failed to update the container state. Container with id : {} "
-          + "does not exist", containerID.getId(), e);
-      return;
-    }
-
-    HddsProtos.LifeCycleState state = info.getState();
-    try {
-      switch (state) {
-      case ALLOCATED:
-        // We cannot close a container in ALLOCATED state, moving the
-        // container to CREATING state, this should eventually
-        // timeout and the container will be moved to DELETING state.
-        LOG.debug("Closing container {} in {} state", containerID, state);
-        containerManager.updateContainerState(containerID.getId(),
-            HddsProtos.LifeCycleEvent.CREATE);
-        break;
-      case CREATING:
-        // We cannot close a container in CREATING state, it will eventually
-        // timeout and moved to DELETING state.
-        LOG.debug("Closing container {} in {} state", containerID, state);
-        break;
-      case OPEN:
-        containerManager.updateContainerState(containerID.getId(),
-            HddsProtos.LifeCycleEvent.FINALIZE);
-        fireCloseContainerEvents(containerWithPipeline, info, publisher);
-        break;
-      case CLOSING:
-        fireCloseContainerEvents(containerWithPipeline, info, publisher);
-        break;
-      case CLOSED:
-      case DELETING:
-      case DELETED:
-        LOG.info(
-            "container with id : {} is in {} state and need not be closed.",
-            containerID.getId(), info.getState());
-        break;
-      default:
-        throw new IOException(
-            "Invalid container state for container " + containerID);
-      }
-    } catch (IOException ex) {
-      LOG.error("Failed to update the container state for" + "container : {}"
-          + containerID, ex);
-    }
-  }
-
-  private void fireCloseContainerEvents(
-      ContainerWithPipeline containerWithPipeline, ContainerInfo info,
-      EventPublisher publisher) {
-    ContainerID containerID = info.containerID();
-    // fire events.
-    CloseContainerCommand closeContainerCommand =
-        new CloseContainerCommand(containerID.getId(),
-            info.getReplicationType(), info.getPipelineID());
-
-    Pipeline pipeline = containerWithPipeline.getPipeline();
-    pipeline.getMachines().stream().map(
-        datanode -> new CommandForDatanode<>(datanode.getUuid(),
-            closeContainerCommand)).forEach((command) -> {
-              publisher.fireEvent(DATANODE_COMMAND, command);
-            });
-    publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
-        new CloseContainerRetryableReq(containerID));
-    LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
-        pipeline, containerID);
-  }
-
-  /**
-   * Class to create retryable event. Prevents redundant requests for same
-   * container Id.
-   */
-  public static class CloseContainerRetryableReq implements
-      IdentifiableEventPayload {
-
-    private ContainerID containerID;
-    public CloseContainerRetryableReq(ContainerID containerID) {
-      this.containerID = containerID;
-    }
-
-    public ContainerID getContainerID() {
-      return containerID;
-    }
-
-    @Override
-    public long getId() {
-      return containerID.getId();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
deleted file mode 100644
index 8e277b9..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
under
- * the License.
- */
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
-    .CloseContainerStatus;
-
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.server.events.Event;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.server.events.EventWatcher;
-import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
-    .CloseContainerRetryableReq;
-import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from
- * CommandStatusReport. If required it will re-trigger CloseContainer command
- * for DataNodes to CloseContainerEventHandler.
- */
-public class CloseContainerWatcher extends
-    EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerWatcher.class);
-  private final Mapping containerManager;
-
-  public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
-      Event<CloseContainerStatus> completionEvent,
-      LeaseManager<Long> leaseManager, Mapping containerManager) {
-    super(startEvent, completionEvent, leaseManager);
-    this.containerManager = containerManager;
-  }
-
-  @Override
-  protected void onTimeout(EventPublisher publisher,
-      CloseContainerRetryableReq payload) {
-    // Let CloseContainerEventHandler handle this message.
-    this.resendEventToHandler(payload.getId(), publisher);
-  }
-
-  @Override
-  protected void onFinished(EventPublisher publisher,
-      CloseContainerRetryableReq payload) {
-    LOG.trace("CloseContainerCommand for containerId: {} executed ", payload
-        .getContainerID().getId());
-  }
-
-  @Override
-  protected synchronized void handleCompletion(CloseContainerStatus status,
-      EventPublisher publisher) throws LeaseNotFoundException {
-    // If status is PENDING then return without doing anything.
-    if(status.getCmdStatus().getStatus().equals(Status.PENDING)){
-      return;
-    }
-
-    CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId());
-    super.handleCompletion(status, publisher);
-    // If status is FAILED then send a msg to Handler to resend the command.
-    if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont
-        != null) {
-      this.resendEventToHandler(closeCont.getId(), publisher);
-    }
-  }
-
-  private void resendEventToHandler(long containerID, EventPublisher
-      publisher) {
-    try {
-      // Check if container is still open
-      if (containerManager.getContainer(containerID).isContainerOpen()) {
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
-            ContainerID.valueof(containerID));
-      }
-    } catch (IOException e) {
-      LOG.warn("Error in CloseContainerWatcher while processing event " +
-          "for containerId {} ExceptionMsg: ", containerID, e.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java
deleted file mode 100644
index ce399eb..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerActionsHandler.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.container;
-
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
-    .ContainerActionsFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles container reports from datanode.
- */
-public class ContainerActionsHandler implements
-    EventHandler<ContainerActionsFromDatanode> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      ContainerActionsHandler.class);
-
-  @Override
-  public void onMessage(
-      ContainerActionsFromDatanode containerReportFromDatanode,
-      EventPublisher publisher) {
-    DatanodeDetails dd = containerReportFromDatanode.getDatanodeDetails();
-    for (ContainerAction action : containerReportFromDatanode.getReport()
-        .getContainerActionsList()) {
-      ContainerID containerId = ContainerID.valueof(action.getContainerID());
-      switch (action.getAction()) {
-      case CLOSE:
-        LOG.debug("Closing container {} in datanode {} because the" +
-            " container is {}.", containerId, dd, action.getReason());
-        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerId);
-        break;
-      default:
-        LOG.warn("Invalid action {} with reason {}, from datanode {}. ",
-            action.getAction(), action.getReason(), dd); }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to