- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.conf.OzoneConfiguration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
-    .CloseContainerHandler;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.util.concurrent.HadoopExecutors;
-import static 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
- * State Machine Class.
- */
-public class DatanodeStateMachine implements Closeable {
-  @VisibleForTesting
-  static final Logger LOG =
-      LoggerFactory.getLogger(DatanodeStateMachine.class);
-  private final ExecutorService executorService;
-  private final Configuration conf;
-  private final SCMConnectionManager connectionManager;
-  private final long heartbeatFrequency;
-  private StateContext context;
-  private final OzoneContainer container;
-  private DatanodeDetails datanodeDetails;
-  private final CommandDispatcher commandDispatcher;
-  private long commandsHandled;
-  private AtomicLong nextHB;
-  private Thread stateMachineThread = null;
-  private Thread cmdProcessThread = null;
-  /**
-   * Constructs a a datanode state machine.
-   *
-   * @param datanodeDetails - DatanodeDetails used to identify a datanode
-   * @param conf - Configuration.
-   */
-  public DatanodeStateMachine(DatanodeDetails datanodeDetails,
-      Configuration conf) throws IOException {
-    this.conf = conf;
-    this.datanodeDetails = datanodeDetails;
-    executorService = HadoopExecutors.newCachedThreadPool(
-                new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("Datanode State Machine Thread - %d").build());
-    connectionManager = new SCMConnectionManager(conf);
-    context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
-    heartbeatFrequency = TimeUnit.SECONDS.toMillis(
-        getScmHeartbeatInterval(conf));
-    container = new OzoneContainer(this.datanodeDetails,
-        new OzoneConfiguration(conf));
-    nextHB = new AtomicLong(Time.monotonicNow());
-     // When we add new handlers just adding a new handler here should do the
-     // trick.
-    commandDispatcher = CommandDispatcher.newBuilder()
-        .addHandler(new ContainerReportHandler())
-        .addHandler(new CloseContainerHandler())
-        .addHandler(new DeleteBlocksCommandHandler(
-            container.getContainerManager(), conf))
-        .setConnectionManager(connectionManager)
-        .setContainer(container)
-        .setContext(context)
-        .build();
-  }
-  /**
-   *
-   * Return DatanodeDetails if set, return null otherwise.
-   *
-   * @return DatanodeDetails
-   */
-  public DatanodeDetails getDatanodeDetails() {
-    return datanodeDetails;
-  }
-  /**
-   * Returns the Connection manager for this state machine.
-   *
-   * @return - SCMConnectionManager.
-   */
-  public SCMConnectionManager getConnectionManager() {
-    return connectionManager;
-  }
-  public OzoneContainer getContainer() {
-    return this.container;
-  }
-  /**
-   * Runs the state machine at a fixed frequency.
-   */
-  private void start() throws IOException {
-    long now = 0;
-    container.start();
-    initCommandHandlerThread(conf);
-    while (context.getState() != DatanodeStates.SHUTDOWN) {
-      try {
-        LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
-        nextHB.set(Time.monotonicNow() + heartbeatFrequency);
-        context.setReportState(container.getNodeReport());
-        context.setContainerReportState(container.getContainerReportState());
-        context.execute(executorService, heartbeatFrequency,
-            TimeUnit.MILLISECONDS);
-        now = Time.monotonicNow();
-        if (now < nextHB.get()) {
-          Thread.sleep(nextHB.get() - now);
-        }
-      } catch (InterruptedException e) {
-        // Ignore this exception.
-      } catch (Exception e) {
-        LOG.error("Unable to finish the execution.", e);
-      }
-    }
-  }
-  /**
-   * Gets the current context.
-   *
-   * @return StateContext
-   */
-  public StateContext getContext() {
-    return context;
-  }
-  /**
-   * Sets the current context.
-   *
-   * @param context - Context
-   */
-  public void setContext(StateContext context) {
-    this.context = context;
-  }
-  /**
-   * Closes this stream and releases any system resources associated with it. 
-   * the stream is already closed then invoking this method has no effect.
-   * <p>
-   * <p> As noted in {@link AutoCloseable#close()}, cases where the close may
-   * fail require careful attention. It is strongly advised to relinquish the
-   * underlying resources and to internally <em>mark</em> the {@code Closeable}
-   * as closed, prior to throwing the {@code IOException}.
-   *
-   * @throws IOException if an I/O error occurs
-   */
-  @Override
-  public void close() throws IOException {
-    if (stateMachineThread != null) {
-      stateMachineThread.interrupt();
-    }
-    if (cmdProcessThread != null) {
-      cmdProcessThread.interrupt();
-    }
-    context.setState(DatanodeStates.getLastState());
-    executorService.shutdown();
-    try {
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        executorService.shutdownNow();
-      }
-      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
-        LOG.error("Unable to shutdown state machine properly.");
-      }
-    } catch (InterruptedException e) {
-      LOG.error("Error attempting to shutdown.", e);
-      executorService.shutdownNow();
-      Thread.currentThread().interrupt();
-    }
-    if (connectionManager != null) {
-      connectionManager.close();
-    }
-    if(container != null) {
-      container.stop();
-    }
-  }
-  /**
-   * States that a datanode  can be in. GetNextState will move this enum from
-   * getInitState to getLastState.
-   */
-  public enum DatanodeStates {
-    INIT(1),
-    RUNNING(2),
-    SHUTDOWN(3);
-    private final int value;
-    /**
-     * Constructs ContainerStates.
-     *
-     * @param value  Enum Value
-     */
-    DatanodeStates(int value) {
-      this.value = value;
-    }
-    /**
-     * Returns the first State.
-     *
-     * @return First State.
-     */
-    public static DatanodeStates getInitState() {
-      return INIT;
-    }
-    /**
-     * The last state of endpoint states.
-     *
-     * @return last state.
-     */
-    public static DatanodeStates getLastState() {
-      return SHUTDOWN;
-    }
-    /**
-     * returns the numeric value associated with the endPoint.
-     *
-     * @return int.
-     */
-    public int getValue() {
-      return value;
-    }
-    /**
-     * Returns the next logical state that endPoint should move to. This
-     * function assumes the States are sequentially numbered.
-     *
-     * @return NextState.
-     */
-    public DatanodeStates getNextState() {
-      if (this.value < getLastState().getValue()) {
-        int stateValue = this.getValue() + 1;
-        for (DatanodeStates iter : values()) {
-          if (stateValue == iter.getValue()) {
-            return iter;
-          }
-        }
-      }
-      return getLastState();
-    }
-  }
-  /**
-   * Start datanode state machine as a single thread daemon.
-   */
-  public void startDaemon() {
-    Runnable startStateMachineTask = () -> {
-      try {
-        start();
-"Ozone container server started.");
-      } catch (Exception ex) {
-        LOG.error("Unable to start the DatanodeState Machine", ex);
-      }
-    };
-    stateMachineThread =  new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("Datanode State Machine Thread - %d")
-        .build().newThread(startStateMachineTask);
-    stateMachineThread.start();
-  }
-  /**
-   * Stop the daemon thread of the datanode state machine.
-   */
-  public synchronized void stopDaemon() {
-    try {
-      context.setState(DatanodeStates.SHUTDOWN);
-      this.close();
-"Ozone container server stopped.");
-    } catch (IOException e) {
-      LOG.error("Stop ozone container server failed.", e);
-    }
-  }
-  /**
-   *
-   * Check if the datanode state machine daemon is stopped.
-   *
-   * @return True if datanode state machine daemon is stopped
-   * and false otherwise.
-   */
-  @VisibleForTesting
-  public boolean isDaemonStopped() {
-    return this.executorService.isShutdown()
-        && this.getContext().getExecutionCount() == 0
-        && this.getContext().getState() == DatanodeStates.SHUTDOWN;
-  }
-  /**
-   * Create a command handler thread.
-   *
-   * @param config
-   */
-  private void initCommandHandlerThread(Configuration config) {
-    /**
-     * Task that periodically checks if we have any outstanding commands.
-     * It is assumed that commands can be processed slowly and in order.
-     * This assumption might change in future. Right now due to this assumption
-     * we have single command  queue process thread.
-     */
-    Runnable processCommandQueue = () -> {
-      long now;
-      while (getContext().getState() != DatanodeStates.SHUTDOWN) {
-        SCMCommand command = getContext().getNextCommand();
-        if (command != null) {
-          commandDispatcher.handle(command);
-          commandsHandled++;
-        } else {
-          try {
-            // Sleep till the next HB + 1 second.
-            now = Time.monotonicNow();
-            if (nextHB.get() > now) {
-              Thread.sleep((nextHB.get() - now) + 1000L);
-            }
-          } catch (InterruptedException e) {
-            // Ignore this exception.
-          }
-        }
-      }
-    };
-    // We will have only one thread for command processing in a datanode.
-    cmdProcessThread = getCommandHandlerThread(processCommandQueue);
-    cmdProcessThread.start();
-  }
-  private Thread getCommandHandlerThread(Runnable processCommandQueue) {
-    Thread handlerThread = new Thread(processCommandQueue);
-    handlerThread.setDaemon(true);
-    handlerThread.setName("Command processor thread");
-    handlerThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
-      // Let us just restart this thread after logging a critical error.
-      // if this thread is not running we cannot handle commands from SCM.
-      LOG.error("Critical Error : Command processor thread encountered an " +
-          "error. Thread: {}", t.toString(), e);
-      getCommandHandlerThread(processCommandQueue).start();
-    });
-    return handlerThread;
-  }
-  /**
-   * Returns the number of commands handled  by the datanode.
-   * @return  count
-   */
-  @VisibleForTesting
-  public long getCommandHandled() {
-    return commandsHandled;
-  }
diff --git 
deleted file mode 100644
index 61bc91e..0000000
+++ /dev/null
@@ -1,295 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.protocol.VersionResponse;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
-import static org.apache.hadoop.ozone.scm.HdslServerUtil.getLogWarnInterval;
-import static org.apache.hadoop.ozone.scm.HdslServerUtil
-    .getScmHeartbeatInterval;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.time.ZonedDateTime;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
- * Endpoint is used as holder class that keeps state around the RPC endpoint.
- */
-public class EndpointStateMachine
-    implements Closeable, EndpointStateMachineMBean {
-  static final Logger
-      LOG = LoggerFactory.getLogger(EndpointStateMachine.class);
-  private final StorageContainerDatanodeProtocolClientSideTranslatorPB 
-  private final AtomicLong missedCount;
-  private final InetSocketAddress address;
-  private final Lock lock;
-  private final Configuration conf;
-  private EndPointStates state;
-  private VersionResponse version;
-  private ZonedDateTime lastSuccessfulHeartbeat;
-  /**
-   * Constructs RPC Endpoints.
-   *
-   * @param endPoint - RPC endPoint.
-   */
-  public EndpointStateMachine(InetSocketAddress address,
-      StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint,
-      Configuration conf) {
-    this.endPoint = endPoint;
-    this.missedCount = new AtomicLong(0);
-    this.address = address;
-    state = EndPointStates.getInitState();
-    lock = new ReentrantLock();
-    this.conf = conf;
-  }
-  /**
-   * Takes a lock on this EndPoint so that other threads don't use this while 
-   * are trying to communicate via this endpoint.
-   */
-  public void lock() {
-    lock.lock();
-  }
-  /**
-   * Unlocks this endpoint.
-   */
-  public void unlock() {
-    lock.unlock();
-  }
-  /**
-   * Returns the version that we read from the server if anyone asks .
-   *
-   * @return - Version Response.
-   */
-  public VersionResponse getVersion() {
-    return version;
-  }
-  /**
-   * Sets the Version reponse we recieved from the SCM.
-   *
-   * @param version VersionResponse
-   */
-  public void setVersion(VersionResponse version) {
-    this.version = version;
-  }
-  /**
-   * Returns the current State this end point is in.
-   *
-   * @return - getState.
-   */
-  public EndPointStates getState() {
-    return state;
-  }
-  @Override
-  public int getVersionNumber() {
-    if (version != null) {
-      return version.getProtobufMessage().getSoftwareVersion();
-    } else {
-      return -1;
-    }
-  }
-  /**
-   * Sets the endpoint state.
-   *
-   * @param epState - end point state.
-   */
-  public EndPointStates setState(EndPointStates epState) {
-    this.state = epState;
-    return this.state;
-  }
-  /**
-   * Closes the connection.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void close() throws IOException {
-    if (endPoint != null) {
-      endPoint.close();
-    }
-  }
-  /**
-   * We maintain a count of how many times we missed communicating with a
-   * specific SCM. This is not made atomic since the access to this is always
-   * guarded by the read or write lock. That is, it is serialized.
-   */
-  public void incMissed() {
-    this.missedCount.incrementAndGet();
-  }
-  /**
-   * Returns the value of the missed count.
-   *
-   * @return int
-   */
-  public long getMissedCount() {
-    return this.missedCount.get();
-  }
-  @Override
-  public String getAddressString() {
-    return getAddress().toString();
-  }
-  public void zeroMissedCount() {
-    this.missedCount.set(0);
-  }
-  /**
-   * Returns the InetAddress of the endPoint.
-   *
-   * @return - EndPoint.
-   */
-  public InetSocketAddress getAddress() {
-    return this.address;
-  }
-  /**
-   * Returns real RPC endPoint.
-   *
-   * @return rpc client.
-   */
-  public StorageContainerDatanodeProtocolClientSideTranslatorPB
-      getEndPoint() {
-    return endPoint;
-  }
-  /**
-   * Returns the string that represents this endpoint.
-   *
-   * @return - String
-   */
-  public String toString() {
-    return address.toString();
-  }
-  /**
-   * Logs exception if needed.
-   *  @param ex         - Exception
-   */
-  public void logIfNeeded(Exception ex) {
-    LOG.trace("Incrementing the Missed count. Ex : {}", ex);
-    this.incMissed();
-    if (this.getMissedCount() % getLogWarnInterval(conf) ==
-        0) {
-      LOG.warn("Unable to communicate to SCM server at {}. We have not been " +
-              "able to communicate to this SCM server for past {} seconds.",
-          this.getAddress().getHostString() + ":" + 
-          this.getMissedCount() * getScmHeartbeatInterval(
-              this.conf));
-    }
-  }
-  /**
-   * States that an Endpoint can be in.
-   * <p>
-   * This is a sorted list of states that EndPoint will traverse.
-   * <p>
-   * GetNextState will move this enum from getInitState to getLastState.
-   */
-  public enum EndPointStates {
-    REGISTER(2),
-    HEARTBEAT(3),
-    SHUTDOWN(4); // if you add value after this please edit getLastState too.
-    private final int value;
-    /**
-     * Constructs endPointStates.
-     *
-     * @param value  state.
-     */
-    EndPointStates(int value) {
-      this.value = value;
-    }
-    /**
-     * Returns the first State.
-     *
-     * @return First State.
-     */
-    public static EndPointStates getInitState() {
-      return GETVERSION;
-    }
-    /**
-     * The last state of endpoint states.
-     *
-     * @return last state.
-     */
-    public static EndPointStates getLastState() {
-      return SHUTDOWN;
-    }
-    /**
-     * returns the numeric value associated with the endPoint.
-     *
-     * @return int.
-     */
-    public int getValue() {
-      return value;
-    }
-    /**
-     * Returns the next logical state that endPoint should move to.
-     * The next state is computed by adding 1 to the current state.
-     *
-     * @return NextState.
-     */
-    public EndPointStates getNextState() {
-      if (this.getValue() < getLastState().getValue()) {
-        int stateValue = this.getValue() + 1;
-        for (EndPointStates iter : values()) {
-          if (stateValue == iter.getValue()) {
-            return iter;
-          }
-        }
-      }
-      return getLastState();
-    }
-  }
-  public long getLastSuccessfulHeartbeat() {
-    return lastSuccessfulHeartbeat == null ?
-        0 :
-        lastSuccessfulHeartbeat.toEpochSecond();
-  }
-  public void setLastSuccessfulHeartbeat(
-      ZonedDateTime lastSuccessfulHeartbeat) {
-    this.lastSuccessfulHeartbeat = lastSuccessfulHeartbeat;
-  }
diff --git 
deleted file mode 100644
index 4f64bde..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
- * JMX representation of an EndpointStateMachine.
- */
-public interface EndpointStateMachineMBean {
-  long getMissedCount();
-  String getAddressString();
-  EndpointStateMachine.EndPointStates getState();
-  int getVersionNumber();
-  long getLastSuccessfulHeartbeat();
diff --git 
deleted file mode 100644
index c9f83c6..0000000
+++ /dev/null
@@ -1,203 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import static org.apache.hadoop.ozone.scm.HdslServerUtil
-    .getScmRpcTimeOutInMilliseconds;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.*;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
- * SCMConnectionManager - Acts as a class that manages the membership
- * information of the SCMs that we are working with.
- */
-public class SCMConnectionManager
-    implements Closeable, SCMConnectionManagerMXBean {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SCMConnectionManager.class);
-  private final ReadWriteLock mapLock;
-  private final Map<InetSocketAddress, EndpointStateMachine> scmMachines;
-  private final int rpcTimeout;
-  private final Configuration conf;
-  private final ObjectName jmxBean;
-  public SCMConnectionManager(Configuration conf) {
-    this.mapLock = new ReentrantReadWriteLock();
-    Long timeOut = getScmRpcTimeOutInMilliseconds(conf);
-    this.rpcTimeout = timeOut.intValue();
-    this.scmMachines = new HashMap<>();
-    this.conf = conf;
-    jmxBean = MBeans.register("OzoneDataNode",
-        "SCMConnectionManager",
-        this);
-  }
-  /**
-   * Returns Config.
-   *
-   * @return ozoneConfig.
-   */
-  public Configuration getConf() {
-    return conf;
-  }
-  /**
-   * Get RpcTimeout.
-   *
-   * @return - Return RPC timeout.
-   */
-  public int getRpcTimeout() {
-    return rpcTimeout;
-  }
-  /**
-   * Takes a read lock.
-   */
-  public void readLock() {
-    this.mapLock.readLock().lock();
-  }
-  /**
-   * Releases the read lock.
-   */
-  public void readUnlock() {
-    this.mapLock.readLock().unlock();
-  }
-  /**
-   * Takes the write lock.
-   */
-  public void writeLock() {
-    this.mapLock.writeLock().lock();
-  }
-  /**
-   * Releases the write lock.
-   */
-  public void writeUnlock() {
-    this.mapLock.writeLock().unlock();
-  }
-  /**
-   * adds a new SCM machine to the target set.
-   *
-   * @param address - Address of the SCM machine to send heatbeat to.
-   * @throws IOException
-   */
-  public void addSCMServer(InetSocketAddress address) throws IOException {
-    writeLock();
-    try {
-      if (scmMachines.containsKey(address)) {
-        LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
-            "Ignoring the request.");
-        return;
-      }
-      RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
-          ProtobufRpcEngine.class);
-      long version =
-          RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
-      StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
-          StorageContainerDatanodeProtocolPB.class, version,
-          address, UserGroupInformation.getCurrentUser(), conf,
-          NetUtils.getDefaultSocketFactory(conf), getRpcTimeout());
-      StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
-          new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
-      EndpointStateMachine endPoint =
-          new EndpointStateMachine(address, rpcClient, conf);
-      scmMachines.put(address, endPoint);
-    } finally {
-      writeUnlock();
-    }
-  }
-  /**
-   * Removes a  SCM machine for the target set.
-   *
-   * @param address - Address of the SCM machine to send heatbeat to.
-   * @throws IOException
-   */
-  public void removeSCMServer(InetSocketAddress address) throws IOException {
-    writeLock();
-    try {
-      if (!scmMachines.containsKey(address)) {
-        LOG.warn("Trying to remove a non-existent SCM machine. " +
-            "Ignoring the request.");
-        return;
-      }
-      EndpointStateMachine endPoint = scmMachines.get(address);
-      endPoint.close();
-      scmMachines.remove(address);
-    } finally {
-      writeUnlock();
-    }
-  }
-  /**
-   * Returns all known RPCEndpoints.
-   *
-   * @return - List of RPC Endpoints.
-   */
-  public Collection<EndpointStateMachine> getValues() {
-    return scmMachines.values();
-  }
-  @Override
-  public void close() throws IOException {
-    getValues().forEach(endpointStateMachine
-        -> IOUtils.cleanupWithLogger(LOG, endpointStateMachine));
-    MBeans.unregister(jmxBean);
-  }
-  @Override
-  public List<EndpointStateMachineMBean> getSCMServers() {
-    readLock();
-    try {
-      return Collections
-          .unmodifiableList(new ArrayList<>(scmMachines.values()));
-    } finally {
-      readUnlock();
-    }
-  }
diff --git 
deleted file mode 100644
index 25ef163..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-import java.util.List;
- * JMX information about the connected SCM servers.
- */
-public interface SCMConnectionManagerMXBean {
-  List<EndpointStateMachineMBean> getSCMServers();
diff --git 
deleted file mode 100644
index e4d6cd9..0000000
+++ /dev/null
@@ -1,281 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.container.common.states.DatanodeState;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
-import static org.apache.hadoop.hdsl.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReportState.states
-    .noContainerReports;
- * Current Context of State Machine.
- */
-public class StateContext {
-  static final Logger LOG =
-      LoggerFactory.getLogger(StateContext.class);
-  private final Queue<SCMCommand> commandQueue;
-  private final Lock lock;
-  private final DatanodeStateMachine parent;
-  private final AtomicLong stateExecutionCount;
-  private final Configuration conf;
-  private DatanodeStateMachine.DatanodeStates state;
-  private SCMNodeReport nrState;
-  private ReportState  reportState;
-  private static final ReportState DEFAULT_REPORT_STATE =
-  /**
-   * Constructs a StateContext.
-   *
-   * @param conf   - Configration
-   * @param state  - State
-   * @param parent Parent State Machine
-   */
-  public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
-      state, DatanodeStateMachine parent) {
-    this.conf = conf;
-    this.state = state;
-    this.parent = parent;
-    commandQueue = new LinkedList<>();
-    lock = new ReentrantLock();
-    stateExecutionCount = new AtomicLong(0);
-    nrState = SCMNodeReport.getDefaultInstance();
-  }
-  /**
-   * Returns the ContainerStateMachine class that holds this state.
-   *
-   * @return ContainerStateMachine.
-   */
-  public DatanodeStateMachine getParent() {
-    return parent;
-  }
-  /**
-   * Get the container server port.
-   * @return The container server port if available, return -1 if otherwise
-   */
-  public int getContainerPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getContainerServerPort();
-  }
-  /**
-   * Gets the Ratis Port.
-   * @return int , return -1 if not valid.
-   */
-  public int getRatisPort() {
-    return parent == null ?
-        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
-  }
-  /**
-   * Returns true if we are entering a new state.
-   *
-   * @return boolean
-   */
-  boolean isEntering() {
-    return stateExecutionCount.get() == 0;
-  }
-  /**
-   * Returns true if we are exiting from the current state.
-   *
-   * @param newState - newState.
-   * @return boolean
-   */
-  boolean isExiting(DatanodeStateMachine.DatanodeStates newState) {
-    boolean isExiting = state != newState && stateExecutionCount.get() > 0;
-    if(isExiting) {
-      stateExecutionCount.set(0);
-    }
-    return isExiting;
-  }
-  /**
-   * Returns the current state the machine is in.
-   *
-   * @return state.
-   */
-  public DatanodeStateMachine.DatanodeStates getState() {
-    return state;
-  }
-  /**
-   * Sets the current state of the machine.
-   *
-   * @param state state.
-   */
-  public void setState(DatanodeStateMachine.DatanodeStates state) {
-    this.state = state;
-  }
-  /**
-   * Returns the node report of the datanode state context.
-   * @return the node report.
-   */
-  public SCMNodeReport getNodeReport() {
-    return nrState;
-  }
-  /**
-   * Sets the storage location report of the datanode state context.
-   * @param nrReport - node report
-   */
-  public void setReportState(SCMNodeReport nrReport) {
-    this.nrState = nrReport;
-  }
-  /**
-   * Returns the next task to get executed by the datanode state machine.
-   * @return A callable that will be executed by the
-   * {@link DatanodeStateMachine}
-   */
-  @SuppressWarnings("unchecked")
-  public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() {
-    switch (this.state) {
-    case INIT:
-      return new InitDatanodeState(this.conf, parent.getConnectionManager(),
-          this);
-    case RUNNING:
-      return new RunningDatanodeState(this.conf, parent.getConnectionManager(),
-          this);
-    case SHUTDOWN:
-      return null;
-    default:
-      throw new IllegalArgumentException("Not Implemented yet.");
-    }
-  }
-  /**
-   * Executes the required state function.
-   *
-   * @param service - Executor Service
-   * @param time    - seconds to wait
-   * @param unit    - Seconds.
-   * @throws InterruptedException
-   * @throws ExecutionException
-   * @throws TimeoutException
-   */
-  public void execute(ExecutorService service, long time, TimeUnit unit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    stateExecutionCount.incrementAndGet();
-    DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask();
-    if (this.isEntering()) {
-      task.onEnter();
-    }
-    task.execute(service);
-    DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
-    if (this.state != newState) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Task {} executed, state transited from {} to {}",
-            task.getClass().getSimpleName(), this.state, newState);
-      }
-      if (isExiting(newState)) {
-        task.onExit();
-      }
-      this.clearReportState();
-      this.setState(newState);
-    }
-  }
-  /**
-   * Returns the next command or null if it is empty.
-   *
-   * @return SCMCommand or Null.
-   */
-  public SCMCommand getNextCommand() {
-    lock.lock();
-    try {
-      return commandQueue.poll();
-    } finally {
-      lock.unlock();
-    }
-  }
-  /**
-   * Adds a command to the State Machine queue.
-   *
-   * @param command - SCMCommand.
-   */
-  public void addCommand(SCMCommand command) {
-    lock.lock();
-    try {
-      commandQueue.add(command);
-    } finally {
-      lock.unlock();
-    }
-  }
-  /**
-   * Returns the count of the Execution.
-   * @return long
-   */
-  public long getExecutionCount() {
-    return stateExecutionCount.get();
-  }
-  /**
-   * Gets the ReportState.
-   * @return ReportState.
-   */
-  public synchronized  ReportState getContainerReportState() {
-    if (reportState == null) {
-    }
-    return reportState;
-  }
-  /**
-   * Sets the ReportState.
-   * @param rState - ReportState.
-   */
-  public synchronized  void setContainerReportState(ReportState rState) {
-    this.reportState = rState;
-  }
-  /**
-   * Clears report state after it has been communicated.
-   */
-  public synchronized void clearReportState() {
-    if(reportState != null) {
-      setContainerReportState(null);
-    }
-  }
diff --git 
deleted file mode 100644
index d5df699..0000000
+++ /dev/null
@@ -1,238 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.background;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BackgroundService;
-import org.apache.hadoop.utils.BackgroundTaskResult;
-import org.apache.hadoop.utils.BackgroundTaskQueue;
-import org.apache.hadoop.utils.BackgroundTask;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- * A per-datanode container block deleting service takes in charge
- * of deleting staled ozone blocks.
- */
-public class BlockDeletingService extends BackgroundService{
-  private static final Logger LOG =
-      LoggerFactory.getLogger(BlockDeletingService.class);
-  private final ContainerManager containerManager;
-  private final Configuration conf;
-  // Throttle number of blocks to delete per task,
-  // set to 1 for testing
-  private final int blockLimitPerTask;
-  // Throttle the number of containers to process concurrently at a time,
-  private final int containerLimitPerInterval;
-  // Task priority is useful when a to-delete block has weight.
-  private final static int TASK_PRIORITY_DEFAULT = 1;
-  // Core pool size for container tasks
-  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
-  public BlockDeletingService(ContainerManager containerManager,
-      long serviceInterval, long serviceTimeout, Configuration conf) {
-    super("BlockDeletingService", serviceInterval,
-        serviceTimeout);
-    this.containerManager = containerManager;
-    this.conf = conf;
-    this.blockLimitPerTask = conf.getInt(
-    this.containerLimitPerInterval = conf.getInt(
-  }
-  @Override
-  public BackgroundTaskQueue getTasks() {
-    BackgroundTaskQueue queue = new BackgroundTaskQueue();
-    List<ContainerData> containers = Lists.newArrayList();
-    try {
-      // We at most list a number of containers a time,
-      // in case there are too many containers and start too many workers.
-      // We must ensure there is no empty container in this result.
-      // The chosen result depends on what container deletion policy is
-      // configured.
-      containers = containerManager.chooseContainerForBlockDeletion(
-          containerLimitPerInterval);
-"Plan to choose {} containers for block deletion, "
-          + "actually returns {} valid containers.",
-          containerLimitPerInterval, containers.size());
-      for(ContainerData container : containers) {
-        BlockDeletingTask containerTask =
-            new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
-        queue.add(containerTask);
-      }
-    } catch (StorageContainerException e) {
-      LOG.warn("Failed to initiate block deleting tasks, "
-          + "caused by unable to get containers info. "
-          + "Retry in next interval. ", e);
-    } catch (Exception e) {
-      // In case listContainer call throws any uncaught RuntimeException.
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Unexpected error occurs during deleting blocks.", e);
-      }
-    }
-    return queue;
-  }
-  private static class ContainerBackgroundTaskResult
-      implements BackgroundTaskResult {
-    private List<String> deletedBlockIds;
-    ContainerBackgroundTaskResult() {
-      deletedBlockIds = new LinkedList<>();
-    }
-    public void addBlockId(String blockId) {
-      deletedBlockIds.add(blockId);
-    }
-    public void addAll(List<String> blockIds) {
-      deletedBlockIds.addAll(blockIds);
-    }
-    public List<String> getDeletedBlocks() {
-      return deletedBlockIds;
-    }
-    @Override
-    public int getSize() {
-      return deletedBlockIds.size();
-    }
-  }
-  private class BlockDeletingTask
-      implements BackgroundTask<BackgroundTaskResult> {
-    private final int priority;
-    private final ContainerData containerData;
-    BlockDeletingTask(ContainerData containerName, int priority) {
-      this.priority = priority;
-      this.containerData = containerName;
-    }
-    @Override
-    public BackgroundTaskResult call() throws Exception {
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
-      long startTime = Time.monotonicNow();
-      // Scan container's db and get list of under deletion blocks
-      MetadataStore meta = KeyUtils.getDB(containerData, conf);
-      // # of blocks to delete is throttled
-      KeyPrefixFilter filter = new KeyPrefixFilter(
-          OzoneConsts.DELETING_KEY_PREFIX);
-      List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
-          meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
-      if (toDeleteBlocks.isEmpty()) {
-        LOG.debug("No under deletion block found in container : {}",
-            containerData.getContainerName());
-      }
-      List<String> succeedBlocks = new LinkedList<>();
-      LOG.debug("Container : {}, To-Delete blocks : {}",
-          containerData.getContainerName(), toDeleteBlocks.size());
-      File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
-      if (!dataDir.exists() || !dataDir.isDirectory()) {
-        LOG.error("Invalid container data dir {} : "
-            + "not exist or not a directory", dataDir.getAbsolutePath());
-        return crr;
-      }
-      toDeleteBlocks.forEach(entry -> {
-        String blockName = DFSUtil.bytes2String(entry.getKey());
-        LOG.debug("Deleting block {}", blockName);
-        try {
-          ContainerProtos.KeyData data =
-              ContainerProtos.KeyData.parseFrom(entry.getValue());
-          for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
-            File chunkFile = dataDir.toPath()
-                .resolve(chunkInfo.getChunkName()).toFile();
-            if (FileUtils.deleteQuietly(chunkFile)) {
-              LOG.debug("block {} chunk {} deleted", blockName,
-                  chunkFile.getAbsolutePath());
-            }
-          }
-          succeedBlocks.add(blockName);
-        } catch (InvalidProtocolBufferException e) {
-          LOG.error("Failed to parse block info for block {}", blockName, e);
-        }
-      });
-      // Once files are deleted ... clean up DB
-      BatchOperation batch = new BatchOperation();
-      succeedBlocks.forEach(entry ->
-          batch.delete(DFSUtil.string2Bytes(entry)));
-      meta.writeBatch(batch);
-      // update count of pending deletion blocks in in-memory container status
-      containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),
-          containerData.getContainerName());
-      if (!succeedBlocks.isEmpty()) {
-"Container: {}, deleted blocks: {}, task elapsed time: {}ms",
-            containerData.getContainerName(), succeedBlocks.size(),
-            Time.monotonicNow() - startTime);
-      }
-      crr.addAll(succeedBlocks);
-      return crr;
-    }
-    @Override
-    public int getPriority() {
-      return priority;
-    }
-  }
diff --git 
deleted file mode 100644
index a9e202e..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.background;
\ No newline at end of file
diff --git 
deleted file mode 100644
index e872555..0000000
+++ /dev/null
@@ -1,112 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .SCMConnectionManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.hdsl.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
-import org.apache.hadoop.hdsl.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Container Report handler.
- */
-public class CloseContainerHandler implements CommandHandler {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CloseContainerHandler.class);
-  private int invocationCount;
-  private long totalTime;
-  /**
-   * Constructs a ContainerReport handler.
-   */
-  public CloseContainerHandler() {
-  }
-  /**
-   * Handles a given SCM command.
-   *
-   * @param command           - SCM Command
-   * @param container         - Ozone Container.
-   * @param context           - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Close Container command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    String containerName = "UNKNOWN";
-    try {
-      SCMCloseContainerCmdResponseProto
-          closeContainerProto =
-          SCMCloseContainerCmdResponseProto
-              .parseFrom(command.getProtoBufMessage());
-      containerName = closeContainerProto.getContainerName();
-      container.getContainerManager().closeContainer(containerName);
-    } catch (Exception e) {
-      LOG.error("Can't close container " + containerName, e);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-  /**
-   * Returns the command type that this command handler handles.
-   *
-   * @return Type
-   */
-  @Override
-  public SCMCmdType getCommandType() {
-    return SCMCmdType.closeContainerCommand;
-  }
-  /**
-   * Returns number of times this handler has been invoked.
-   *
-   * @return int
-   */
-  @Override
-  public int getInvocationCount() {
-    return invocationCount;
-  }
-  /**
-   * Returns the average time this function takes to run.
-   *
-   * @return long
-   */
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
diff --git 
deleted file mode 100644
index fee3e1c..0000000
+++ /dev/null
@@ -1,177 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
- * Dispatches command to the correct handler.
- */
-public final class CommandDispatcher {
-  static final Logger LOG =
-      LoggerFactory.getLogger(CommandDispatcher.class);
-  private final StateContext context;
-  private final Map<SCMCmdType, CommandHandler> handlerMap;
-  private final OzoneContainer container;
-  private final SCMConnectionManager connectionManager;
-  /**
-   * Constructs a command Dispatcher.
-   * @param context - Context.
-   */
-  /**
-   * Constructs a command dispatcher.
-   *
-   * @param container - Ozone Container
-   * @param context - Context
-   * @param handlers - Set of handlers.
-   */
-  private CommandDispatcher(OzoneContainer container, SCMConnectionManager
-      connectionManager, StateContext context,
-      CommandHandler... handlers) {
-    Preconditions.checkNotNull(context);
-    Preconditions.checkNotNull(handlers);
-    Preconditions.checkArgument(handlers.length > 0);
-    Preconditions.checkNotNull(container);
-    Preconditions.checkNotNull(connectionManager);
-    this.context = context;
-    this.container = container;
-    this.connectionManager = connectionManager;
-    handlerMap = new HashMap<>();
-    for (CommandHandler h : handlers) {
-      if(handlerMap.containsKey(h.getCommandType())){
-        LOG.error("Duplicate handler for the same command. Exiting. Handle " +
-            "key : { }", h.getCommandType().getDescriptorForType().getName());
-        throw new IllegalArgumentException("Duplicate handler for the same " +
-            "command.");
-      }
-      handlerMap.put(h.getCommandType(), h);
-    }
-  }
-  /**
-   * Dispatch the command to the correct handler.
-   *
-   * @param command - SCM Command.
-   */
-  public void handle(SCMCommand command) {
-    Preconditions.checkNotNull(command);
-    CommandHandler handler = handlerMap.get(command.getType());
-    if (handler != null) {
-      handler.handle(command, container, context, connectionManager);
-    } else {
-      LOG.error("Unknown SCM Command queued. There is no handler for this " +
-          "command. Command: {}", command.getType().getDescriptorForType()
-          .getName());
-    }
-  }
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-  /**
-   * Helper class to construct command dispatcher.
-   */
-  public static class Builder {
-    private final List<CommandHandler> handlerList;
-    private OzoneContainer container;
-    private StateContext context;
-    private SCMConnectionManager connectionManager;
-    public Builder() {
-      handlerList = new LinkedList<>();
-    }
-    /**
-     * Adds a handler.
-     *
-     * @param handler - handler
-     * @return Builder
-     */
-    public Builder addHandler(CommandHandler handler) {
-      Preconditions.checkNotNull(handler);
-      handlerList.add(handler);
-      return this;
-    }
-    /**
-     * Add the OzoneContainer.
-     *
-     * @param ozoneContainer - ozone container.
-     * @return Builder
-     */
-    public Builder setContainer(OzoneContainer ozoneContainer) {
-      Preconditions.checkNotNull(ozoneContainer);
-      this.container = ozoneContainer;
-      return this;
-    }
-    /**
-     * Set the Connection Manager.
-     *
-     * @param scmConnectionManager
-     * @return this
-     */
-    public Builder setConnectionManager(SCMConnectionManager
-        scmConnectionManager) {
-      Preconditions.checkNotNull(scmConnectionManager);
-      this.connectionManager = scmConnectionManager;
-      return this;
-    }
-    /**
-     * Sets the Context.
-     *
-     * @param stateContext - StateContext
-     * @return this
-     */
-    public Builder setContext(StateContext stateContext) {
-      Preconditions.checkNotNull(stateContext);
-      this.context = stateContext;
-      return this;
-    }
-    /**
-     * Builds a command Dispatcher.
-     * @return Command Dispatcher.
-     */
-    public CommandDispatcher build() {
-      Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
-          " manager.");
-      Preconditions.checkNotNull(this.container, "Missing container.");
-      Preconditions.checkNotNull(this.context, "Missing context.");
-      Preconditions.checkArgument(this.handlerList.size() > 0);
-      return new CommandDispatcher(this.container, this.connectionManager,
-          this.context, handlerList.toArray(
-              new CommandHandler[handlerList.size()]));
-    }
-  }
diff --git 
deleted file mode 100644
index b54923e..0000000
+++ /dev/null
@@ -1,59 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
- * Generic interface for handlers.
- */
-public interface CommandHandler {
-  /**
-   * Handles a given SCM command.
-   * @param command - SCM Command
-   * @param container - Ozone Container.
-   * @param context - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager);
-  /**
-   * Returns the command type that this command handler handles.
-   * @return Type
-   */
-  SCMCmdType getCommandType();
-  /**
-   * Returns number of times this handler has been invoked.
-   * @return int
-   */
-  int getInvocationCount();
-  /**
-   * Returns the average time this function takes to run.
-   * @return  long
-   */
-  long getAverageRunTime();
diff --git 
deleted file mode 100644
index e9f4b61..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
- * Container Report handler.
- */
-public class ContainerReportHandler implements CommandHandler {
-  static final Logger LOG =
-      LoggerFactory.getLogger(ContainerReportHandler.class);
-  private int invocationCount;
-  private long totalTime;
-  /**
-   * Constructs a ContainerReport handler.
-   */
-  public ContainerReportHandler() {
-  }
-  /**
-   * Handles a given SCM command.
-   *
-   * @param command - SCM Command
-   * @param container - Ozone Container.
-   * @param context - Current Context.
-   * @param connectionManager - The SCMs that we are talking to.
-   */
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    LOG.debug("Processing Container Report.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    try {
-      ContainerReportsRequestProto contianerReport =
-          container.getContainerReport();
-      // TODO : We send this report to all SCMs.Check if it is enough only to
-      // send to the leader once we have RAFT enabled SCMs.
-      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
-        endPoint.getEndPoint().sendContainerReport(contianerReport);
-      }
-    } catch (IOException ex) {
-      LOG.error("Unable to process the Container Report command.", ex);
-    } finally {
-      long endTime = Time.monotonicNow();
-      totalTime += endTime - startTime;
-    }
-  }
-  /**
-   * Returns the command type that this command handler handles.
-   *
-   * @return Type
-   */
-  @Override
-  public SCMCmdType getCommandType() {
-    return SCMCmdType.sendContainerReport;
-  }
-  /**
-   * Returns number of times this handler has been invoked.
-   *
-   * @return int
-   */
-  @Override
-  public int getInvocationCount() {
-    return invocationCount;
-  }
-  /**
-   * Returns the average time this function takes to run.
-   *
-   * @return long
-   */
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
diff --git 
deleted file mode 100644
index ff38cdc..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- *
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations 
- * the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.List;
- * Handle block deletion commands.
- */
-public class DeleteBlocksCommandHandler implements CommandHandler {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
-  private ContainerManager containerManager;
-  private Configuration conf;
-  private int invocationCount;
-  private long totalTime;
-  public DeleteBlocksCommandHandler(ContainerManager containerManager,
-      Configuration conf) {
-    this.containerManager = containerManager;
-    this.conf = conf;
-  }
-  @Override
-  public void handle(SCMCommand command, OzoneContainer container,
-      StateContext context, SCMConnectionManager connectionManager) {
-    if (command.getType() != SCMCmdType.deleteBlocksCommand) {
-      LOG.warn("Skipping handling command, expected command "
-              + "type {} but found {}",
-          SCMCmdType.deleteBlocksCommand, command.getType());
-      return;
-    }
-    LOG.debug("Processing block deletion command.");
-    invocationCount++;
-    long startTime = Time.monotonicNow();
-    // move blocks to deleting state.
-    // this is a metadata update, the actual deletion happens in another
-    // recycling thread.
-    DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
-    List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
-    DeletedContainerBlocksSummary summary =
-        DeletedContainerBlocksSummary.getFrom(containerBlocks);
-"Start to delete container blocks, TXIDs={}, "
-            + "numOfContainers={}, numOfBlocks={}",
-        summary.getTxIDSummary(),
-        summary.getNumOfContainers(),
-        summary.getNumOfBlocks());
-    ContainerBlocksDeletionACKProto.Builder resultBuilder =
-        ContainerBlocksDeletionACKProto.newBuilder();
-    containerBlocks.forEach(entry -> {
-      DeleteBlockTransactionResult.Builder txResultBuilder =
-          DeleteBlockTransactionResult.newBuilder();
-      txResultBuilder.setTxID(entry.getTxID());
-      try {
-        deleteContainerBlocks(entry, conf);
-        txResultBuilder.setSuccess(true);
-      } catch (IOException e) {
-        LOG.warn("Failed to delete blocks for container={}, TXID={}",
-            entry.getContainerName(), entry.getTxID(), e);
-        txResultBuilder.setSuccess(false);
-      }
-      resultBuilder.addResults(;
-    });
-    ContainerBlocksDeletionACKProto blockDeletionACK =;
-    // Send ACK back to SCM as long as meta updated
-    // TODO Or we should wait until the blocks are actually deleted?
-    if (!containerBlocks.isEmpty()) {
-      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
-        try {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sending following block deletion ACK to SCM");
-            for (DeleteBlockTransactionResult result :
-                blockDeletionACK.getResultsList()) {
-              LOG.debug(result.getTxID() + " : " + result.getSuccess());
-            }
-          }
-          endPoint.getEndPoint()
-              .sendContainerBlocksDeletionACK(blockDeletionACK);
-        } catch (IOException e) {
-          LOG.error("Unable to send block deletion ACK to SCM {}",
-              endPoint.getAddress().toString(), e);
-        }
-      }
-    }
-    long endTime = Time.monotonicNow();
-    totalTime += endTime - startTime;
-  }
-  /**
-   * Move a bunch of blocks from a container to deleting state.
-   * This is a meta update, the actual deletes happen in async mode.
-   *
-   * @param delTX a block deletion transaction.
-   * @param config configuration.
-   * @throws IOException if I/O error occurs.
-   */
-  private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
-      Configuration config) throws IOException {
-    String containerId = delTX.getContainerName();
-    ContainerData containerInfo = containerManager.readContainer(containerId);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing Container : {}, DB path : {}", containerId,
-          containerInfo.getDBPath());
-    }
-    int newDeletionBlocks = 0;
-    MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
-    for (String blk : delTX.getBlockIDList()) {
-      BatchOperation batch = new BatchOperation();
-      byte[] blkBytes = DFSUtil.string2Bytes(blk);
-      byte[] blkInfo = containerDB.get(blkBytes);
-      if (blkInfo != null) {
-        // Found the block in container db,
-        // use an atomic update to change its state to deleting.
-        batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
-            blkInfo);
-        batch.delete(blkBytes);
-        try {
-          containerDB.writeBatch(batch);
-          newDeletionBlocks++;
-          LOG.debug("Transited Block {} to DELETING state in container {}",
-              blk, containerId);
-        } catch (IOException e) {
-          // if some blocks failed to delete, we fail this TX,
-          // without sending this ACK to SCM, SCM will resend the TX
-          // with a certain number of retries.
-          throw new IOException(
-              "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
-        }
-      } else {
-        LOG.debug("Block {} not found or already under deletion in"
-                + " container {}, skip deleting it.", blk, containerId);
-      }
-    }
-    // update pending deletion blocks count in in-memory container status
-    containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId);
-  }
-  @Override
-  public SCMCmdType getCommandType() {
-    return SCMCmdType.deleteBlocksCommand;
-  }
-  @Override
-  public int getInvocationCount() {
-    return this.invocationCount;
-  }
-  @Override
-  public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
-    }
-    return 0;
-  }
diff --git 
deleted file mode 100644
index 1e9c8dc..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
\ No newline at end of file
diff --git 
deleted file mode 100644
index feb2f81..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.statemachine;
- State machine class is used by the container to denote various states a
- container can be in and also is used for command processing.
- Container has the following states.
- Start - > getVersion -> Register -> Running  -> Shutdown
- */
\ No newline at end of file
diff --git 
deleted file mode 100644
index 75142af..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
- * State Interface that allows tasks to maintain states.
- */
-public interface DatanodeState<T> {
-  /**
-   * Called before entering this state.
-   */
-  void onEnter();
-  /**
-   * Called After exiting this state.
-   */
-  void onExit();
-  /**
-   * Executes one or more tasks that is needed by this state.
-   *
-   * @param executor -  ExecutorService
-   */
-  void execute(ExecutorService executor);
-  /**
-   * Wait for execute to finish.
-   *
-   * @param time - Time
-   * @param timeUnit - Unit of time.
-   */
-  T await(long time, TimeUnit timeUnit)
-      throws InterruptedException, ExecutionException, TimeoutException;

