http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
new file mode 100644
index 0000000..8e9482f
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CloseContainerHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CommandDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ContainerReportHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .DeleteBlocksCommandHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
+
+/**
+ * State Machine Class.
+ */
+public class DatanodeStateMachine implements Closeable {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(DatanodeStateMachine.class);
+  private final ExecutorService executorService;
+  private final Configuration conf;
+  private final SCMConnectionManager connectionManager;
+  private final long heartbeatFrequency;
+  private StateContext context;
+  private final OzoneContainer container;
+  private DatanodeDetails datanodeDetails;
+  private final CommandDispatcher commandDispatcher;
+  private long commandsHandled;
+  private AtomicLong nextHB;
+  private Thread stateMachineThread = null;
+  private Thread cmdProcessThread = null;
+
+  /**
+   * Constructs a a datanode state machine.
+   *
+   * @param datanodeDetails - DatanodeDetails used to identify a datanode
+   * @param conf - Configuration.
+   */
+  public DatanodeStateMachine(DatanodeDetails datanodeDetails,
+      Configuration conf) throws IOException {
+    this.conf = conf;
+    this.datanodeDetails = datanodeDetails;
+    executorService = HadoopExecutors.newCachedThreadPool(
+                new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Datanode State Machine Thread - %d").build());
+    connectionManager = new SCMConnectionManager(conf);
+    context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
+    heartbeatFrequency = TimeUnit.SECONDS.toMillis(
+        getScmHeartbeatInterval(conf));
+    container = new OzoneContainer(this.datanodeDetails,
+        new OzoneConfiguration(conf));
+    nextHB = new AtomicLong(Time.monotonicNow());
+
+     // When we add new handlers just adding a new handler here should do the
+     // trick.
+    commandDispatcher = CommandDispatcher.newBuilder()
+        .addHandler(new ContainerReportHandler())
+        .addHandler(new CloseContainerHandler())
+        .addHandler(new DeleteBlocksCommandHandler(
+            container.getContainerManager(), conf))
+        .setConnectionManager(connectionManager)
+        .setContainer(container)
+        .setContext(context)
+        .build();
+  }
+
+  /**
+   *
+   * Return DatanodeDetails if set, return null otherwise.
+   *
+   * @return DatanodeDetails
+   */
+  public DatanodeDetails getDatanodeDetails() {
+    return datanodeDetails;
+  }
+
+
+  /**
+   * Returns the Connection manager for this state machine.
+   *
+   * @return - SCMConnectionManager.
+   */
+  public SCMConnectionManager getConnectionManager() {
+    return connectionManager;
+  }
+
+  public OzoneContainer getContainer() {
+    return this.container;
+  }
+
+  /**
+   * Runs the state machine at a fixed frequency.
+   */
+  private void start() throws IOException {
+    long now = 0;
+
+    container.start();
+    initCommandHandlerThread(conf);
+    while (context.getState() != DatanodeStates.SHUTDOWN) {
+      try {
+        LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
+        nextHB.set(Time.monotonicNow() + heartbeatFrequency);
+        context.setReportState(container.getNodeReport());
+        context.setContainerReportState(container.getContainerReportState());
+        context.execute(executorService, heartbeatFrequency,
+            TimeUnit.MILLISECONDS);
+        now = Time.monotonicNow();
+        if (now < nextHB.get()) {
+          Thread.sleep(nextHB.get() - now);
+        }
+      } catch (InterruptedException e) {
+        // Ignore this exception.
+      } catch (Exception e) {
+        LOG.error("Unable to finish the execution.", e);
+      }
+    }
+  }
+
+  /**
+   * Gets the current context.
+   *
+   * @return StateContext
+   */
+  public StateContext getContext() {
+    return context;
+  }
+
+  /**
+   * Sets the current context.
+   *
+   * @param context - Context
+   */
+  public void setContext(StateContext context) {
+    this.context = context;
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it. 
If
+   * the stream is already closed then invoking this method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the close may
+   * fail require careful attention. It is strongly advised to relinquish the
+   * underlying resources and to internally <em>mark</em> the {@code Closeable}
+   * as closed, prior to throwing the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    if (stateMachineThread != null) {
+      stateMachineThread.interrupt();
+    }
+    if (cmdProcessThread != null) {
+      cmdProcessThread.interrupt();
+    }
+    context.setState(DatanodeStates.getLastState());
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown state machine properly.");
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Error attempting to shutdown.", e);
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+
+    if (connectionManager != null) {
+      connectionManager.close();
+    }
+
+    if(container != null) {
+      container.stop();
+    }
+  }
+
+  /**
+   * States that a datanode  can be in. GetNextState will move this enum from
+   * getInitState to getLastState.
+   */
+  public enum DatanodeStates {
+    INIT(1),
+    RUNNING(2),
+    SHUTDOWN(3);
+    private final int value;
+
+    /**
+     * Constructs states.
+     *
+     * @param value  Enum Value
+     */
+    DatanodeStates(int value) {
+      this.value = value;
+    }
+
+    /**
+     * Returns the first State.
+     *
+     * @return First State.
+     */
+    public static DatanodeStates getInitState() {
+      return INIT;
+    }
+
+    /**
+     * The last state of endpoint states.
+     *
+     * @return last state.
+     */
+    public static DatanodeStates getLastState() {
+      return SHUTDOWN;
+    }
+
+    /**
+     * returns the numeric value associated with the endPoint.
+     *
+     * @return int.
+     */
+    public int getValue() {
+      return value;
+    }
+
+    /**
+     * Returns the next logical state that endPoint should move to. This
+     * function assumes the States are sequentially numbered.
+     *
+     * @return NextState.
+     */
+    public DatanodeStates getNextState() {
+      if (this.value < getLastState().getValue()) {
+        int stateValue = this.getValue() + 1;
+        for (DatanodeStates iter : values()) {
+          if (stateValue == iter.getValue()) {
+            return iter;
+          }
+        }
+      }
+      return getLastState();
+    }
+  }
+
+  /**
+   * Start datanode state machine as a single thread daemon.
+   */
+  public void startDaemon() {
+    Runnable startStateMachineTask = () -> {
+      try {
+        start();
+        LOG.info("Ozone container server started.");
+      } catch (Exception ex) {
+        LOG.error("Unable to start the DatanodeState Machine", ex);
+      }
+    };
+    stateMachineThread =  new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("Datanode State Machine Thread - %d")
+        .build().newThread(startStateMachineTask);
+    stateMachineThread.start();
+  }
+
+  /**
+   * Stop the daemon thread of the datanode state machine.
+   */
+  public synchronized void stopDaemon() {
+    try {
+      context.setState(DatanodeStates.SHUTDOWN);
+      this.close();
+      LOG.info("Ozone container server stopped.");
+    } catch (IOException e) {
+      LOG.error("Stop ozone container server failed.", e);
+    }
+  }
+
+  /**
+   *
+   * Check if the datanode state machine daemon is stopped.
+   *
+   * @return True if datanode state machine daemon is stopped
+   * and false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isDaemonStopped() {
+    return this.executorService.isShutdown()
+        && this.getContext().getExecutionCount() == 0
+        && this.getContext().getState() == DatanodeStates.SHUTDOWN;
+  }
+
+  /**
+   * Create a command handler thread.
+   *
+   * @param config
+   */
+  private void initCommandHandlerThread(Configuration config) {
+
+    /**
+     * Task that periodically checks if we have any outstanding commands.
+     * It is assumed that commands can be processed slowly and in order.
+     * This assumption might change in future. Right now due to this assumption
+     * we have single command  queue process thread.
+     */
+    Runnable processCommandQueue = () -> {
+      long now;
+      while (getContext().getState() != DatanodeStates.SHUTDOWN) {
+        SCMCommand command = getContext().getNextCommand();
+        if (command != null) {
+          commandDispatcher.handle(command);
+          commandsHandled++;
+        } else {
+          try {
+            // Sleep till the next HB + 1 second.
+            now = Time.monotonicNow();
+            if (nextHB.get() > now) {
+              Thread.sleep((nextHB.get() - now) + 1000L);
+            }
+          } catch (InterruptedException e) {
+            // Ignore this exception.
+          }
+        }
+      }
+    };
+
+    // We will have only one thread for command processing in a datanode.
+    cmdProcessThread = getCommandHandlerThread(processCommandQueue);
+    cmdProcessThread.start();
+  }
+
+  private Thread getCommandHandlerThread(Runnable processCommandQueue) {
+    Thread handlerThread = new Thread(processCommandQueue);
+    handlerThread.setDaemon(true);
+    handlerThread.setName("Command processor thread");
+    handlerThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+      // Let us just restart this thread after logging a critical error.
+      // if this thread is not running we cannot handle commands from SCM.
+      LOG.error("Critical Error : Command processor thread encountered an " +
+          "error. Thread: {}", t.toString(), e);
+      getCommandHandlerThread(processCommandQueue).start();
+    });
+    return handlerThread;
+  }
+
+  /**
+   * Returns the number of commands handled  by the datanode.
+   * @return  count
+   */
+  @VisibleForTesting
+  public long getCommandHandled() {
+    return commandsHandled;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
new file mode 100644
index 0000000..7e85923
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.ZonedDateTime;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.scm.HddsServerUtil.getLogWarnInterval;
+import static 
org.apache.hadoop.hdds.scm.HddsServerUtil.getScmHeartbeatInterval;
+
+/**
+ * Endpoint is used as holder class that keeps state around the RPC endpoint.
+ */
+public class EndpointStateMachine
+    implements Closeable, EndpointStateMachineMBean {
+  static final Logger
+      LOG = LoggerFactory.getLogger(EndpointStateMachine.class);
+  private final StorageContainerDatanodeProtocolClientSideTranslatorPB 
endPoint;
+  private final AtomicLong missedCount;
+  private final InetSocketAddress address;
+  private final Lock lock;
+  private final Configuration conf;
+  private EndPointStates state;
+  private VersionResponse version;
+  private ZonedDateTime lastSuccessfulHeartbeat;
+
+  /**
+   * Constructs RPC Endpoints.
+   *
+   * @param endPoint - RPC endPoint.
+   */
+  public EndpointStateMachine(InetSocketAddress address,
+      StorageContainerDatanodeProtocolClientSideTranslatorPB endPoint,
+      Configuration conf) {
+    this.endPoint = endPoint;
+    this.missedCount = new AtomicLong(0);
+    this.address = address;
+    state = EndPointStates.getInitState();
+    lock = new ReentrantLock();
+    this.conf = conf;
+  }
+
+  /**
+   * Takes a lock on this EndPoint so that other threads don't use this while 
we
+   * are trying to communicate via this endpoint.
+   */
+  public void lock() {
+    lock.lock();
+  }
+
+  /**
+   * Unlocks this endpoint.
+   */
+  public void unlock() {
+    lock.unlock();
+  }
+
+  /**
+   * Returns the version that we read from the server if anyone asks .
+   *
+   * @return - Version Response.
+   */
+  public VersionResponse getVersion() {
+    return version;
+  }
+
+  /**
+   * Sets the Version reponse we recieved from the SCM.
+   *
+   * @param version VersionResponse
+   */
+  public void setVersion(VersionResponse version) {
+    this.version = version;
+  }
+
+  /**
+   * Returns the current State this end point is in.
+   *
+   * @return - getState.
+   */
+  public EndPointStates getState() {
+    return state;
+  }
+
+  @Override
+  public int getVersionNumber() {
+    if (version != null) {
+      return version.getProtobufMessage().getSoftwareVersion();
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Sets the endpoint state.
+   *
+   * @param epState - end point state.
+   */
+  public EndPointStates setState(EndPointStates epState) {
+    this.state = epState;
+    return this.state;
+  }
+
+  /**
+   * Closes the connection.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (endPoint != null) {
+      endPoint.close();
+    }
+  }
+
+  /**
+   * We maintain a count of how many times we missed communicating with a
+   * specific SCM. This is not made atomic since the access to this is always
+   * guarded by the read or write lock. That is, it is serialized.
+   */
+  public void incMissed() {
+    this.missedCount.incrementAndGet();
+  }
+
+  /**
+   * Returns the value of the missed count.
+   *
+   * @return int
+   */
+  public long getMissedCount() {
+    return this.missedCount.get();
+  }
+
+  @Override
+  public String getAddressString() {
+    return getAddress().toString();
+  }
+
+  public void zeroMissedCount() {
+    this.missedCount.set(0);
+  }
+
+  /**
+   * Returns the InetAddress of the endPoint.
+   *
+   * @return - EndPoint.
+   */
+  public InetSocketAddress getAddress() {
+    return this.address;
+  }
+
+  /**
+   * Returns real RPC endPoint.
+   *
+   * @return rpc client.
+   */
+  public StorageContainerDatanodeProtocolClientSideTranslatorPB
+      getEndPoint() {
+    return endPoint;
+  }
+
+  /**
+   * Returns the string that represents this endpoint.
+   *
+   * @return - String
+   */
+  public String toString() {
+    return address.toString();
+  }
+
+  /**
+   * Logs exception if needed.
+   *  @param ex         - Exception
+   */
+  public void logIfNeeded(Exception ex) {
+    LOG.trace("Incrementing the Missed count. Ex : {}", ex);
+    this.incMissed();
+    if (this.getMissedCount() % getLogWarnInterval(conf) ==
+        0) {
+      LOG.warn("Unable to communicate to SCM server at {}. We have not been " +
+              "able to communicate to this SCM server for past {} seconds.",
+          this.getAddress().getHostString() + ":" + 
this.getAddress().getPort(),
+          this.getMissedCount() * getScmHeartbeatInterval(
+              this.conf));
+    }
+  }
+
+
+  /**
+   * States that an Endpoint can be in.
+   * <p>
+   * This is a sorted list of states that EndPoint will traverse.
+   * <p>
+   * GetNextState will move this enum from getInitState to getLastState.
+   */
+  public enum EndPointStates {
+    GETVERSION(1),
+    REGISTER(2),
+    HEARTBEAT(3),
+    SHUTDOWN(4); // if you add value after this please edit getLastState too.
+    private final int value;
+
+    /**
+     * Constructs endPointStates.
+     *
+     * @param value  state.
+     */
+    EndPointStates(int value) {
+      this.value = value;
+    }
+
+    /**
+     * Returns the first State.
+     *
+     * @return First State.
+     */
+    public static EndPointStates getInitState() {
+      return GETVERSION;
+    }
+
+    /**
+     * The last state of endpoint states.
+     *
+     * @return last state.
+     */
+    public static EndPointStates getLastState() {
+      return SHUTDOWN;
+    }
+
+    /**
+     * returns the numeric value associated with the endPoint.
+     *
+     * @return int.
+     */
+    public int getValue() {
+      return value;
+    }
+
+    /**
+     * Returns the next logical state that endPoint should move to.
+     * The next state is computed by adding 1 to the current state.
+     *
+     * @return NextState.
+     */
+    public EndPointStates getNextState() {
+      if (this.getValue() < getLastState().getValue()) {
+        int stateValue = this.getValue() + 1;
+        for (EndPointStates iter : values()) {
+          if (stateValue == iter.getValue()) {
+            return iter;
+          }
+        }
+      }
+      return getLastState();
+    }
+  }
+
+  public long getLastSuccessfulHeartbeat() {
+    return lastSuccessfulHeartbeat == null ?
+        0 :
+        lastSuccessfulHeartbeat.toEpochSecond();
+  }
+
+  public void setLastSuccessfulHeartbeat(
+      ZonedDateTime lastSuccessfulHeartbeat) {
+    this.lastSuccessfulHeartbeat = lastSuccessfulHeartbeat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java
new file mode 100644
index 0000000..4f64bde
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachineMBean.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+
+/**
+ * JMX representation of an EndpointStateMachine.
+ */
+public interface EndpointStateMachineMBean {
+
+  long getMissedCount();
+
+  String getAddressString();
+
+  EndpointStateMachine.EndPointStates getState();
+
+  int getVersionNumber();
+
+  long getLastSuccessfulHeartbeat();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
new file mode 100644
index 0000000..19722f0
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.hdds.scm.HddsServerUtil
+    .getScmRpcTimeOutInMilliseconds;
+
+/**
+ * SCMConnectionManager - Acts as a class that manages the membership
+ * information of the SCMs that we are working with.
+ */
+public class SCMConnectionManager
+    implements Closeable, SCMConnectionManagerMXBean {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMConnectionManager.class);
+
+  private final ReadWriteLock mapLock;
+  private final Map<InetSocketAddress, EndpointStateMachine> scmMachines;
+
+  private final int rpcTimeout;
+  private final Configuration conf;
+  private final ObjectName jmxBean;
+
+  public SCMConnectionManager(Configuration conf) {
+    this.mapLock = new ReentrantReadWriteLock();
+    Long timeOut = getScmRpcTimeOutInMilliseconds(conf);
+    this.rpcTimeout = timeOut.intValue();
+    this.scmMachines = new HashMap<>();
+    this.conf = conf;
+    jmxBean = MBeans.register("OzoneDataNode",
+        "SCMConnectionManager",
+        this);
+  }
+
+
+  /**
+   * Returns Config.
+   *
+   * @return ozoneConfig.
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Get RpcTimeout.
+   *
+   * @return - Return RPC timeout.
+   */
+  public int getRpcTimeout() {
+    return rpcTimeout;
+  }
+
+
+  /**
+   * Takes a read lock.
+   */
+  public void readLock() {
+    this.mapLock.readLock().lock();
+  }
+
+  /**
+   * Releases the read lock.
+   */
+  public void readUnlock() {
+    this.mapLock.readLock().unlock();
+  }
+
+  /**
+   * Takes the write lock.
+   */
+  public void writeLock() {
+    this.mapLock.writeLock().lock();
+  }
+
+  /**
+   * Releases the write lock.
+   */
+  public void writeUnlock() {
+    this.mapLock.writeLock().unlock();
+  }
+
+  /**
+   * adds a new SCM machine to the target set.
+   *
+   * @param address - Address of the SCM machine to send heatbeat to.
+   * @throws IOException
+   */
+  public void addSCMServer(InetSocketAddress address) throws IOException {
+    writeLock();
+    try {
+      if (scmMachines.containsKey(address)) {
+        LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
+            "Ignoring the request.");
+        return;
+      }
+      RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+          ProtobufRpcEngine.class);
+      long version =
+          RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+
+      StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProxy(
+          StorageContainerDatanodeProtocolPB.class, version,
+          address, UserGroupInformation.getCurrentUser(), conf,
+          NetUtils.getDefaultSocketFactory(conf), getRpcTimeout());
+
+      StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+          new StorageContainerDatanodeProtocolClientSideTranslatorPB(rpcProxy);
+
+      EndpointStateMachine endPoint =
+          new EndpointStateMachine(address, rpcClient, conf);
+      scmMachines.put(address, endPoint);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Removes a  SCM machine for the target set.
+   *
+   * @param address - Address of the SCM machine to send heatbeat to.
+   * @throws IOException
+   */
+  public void removeSCMServer(InetSocketAddress address) throws IOException {
+    writeLock();
+    try {
+      if (!scmMachines.containsKey(address)) {
+        LOG.warn("Trying to remove a non-existent SCM machine. " +
+            "Ignoring the request.");
+        return;
+      }
+
+      EndpointStateMachine endPoint = scmMachines.get(address);
+      endPoint.close();
+      scmMachines.remove(address);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Returns all known RPCEndpoints.
+   *
+   * @return - List of RPC Endpoints.
+   */
+  public Collection<EndpointStateMachine> getValues() {
+    return scmMachines.values();
+  }
+
+  @Override
+  public void close() throws IOException {
+    getValues().forEach(endpointStateMachine
+        -> IOUtils.cleanupWithLogger(LOG, endpointStateMachine));
+    MBeans.unregister(jmxBean);
+  }
+
+  @Override
+  public List<EndpointStateMachineMBean> getSCMServers() {
+    readLock();
+    try {
+      return Collections
+          .unmodifiableList(new ArrayList<>(scmMachines.values()));
+
+    } finally {
+      readUnlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java
new file mode 100644
index 0000000..25ef163
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManagerMXBean.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import java.util.List;
+
+/**
+ * JMX information about the connected SCM servers.
+ */
+public interface SCMConnectionManagerMXBean {
+
+  List<EndpointStateMachineMBean> getSCMServers();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
new file mode 100644
index 0000000..55476fd
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.container.common.states.DatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+    .InitDatanodeState;
+import org.apache.hadoop.ozone.container.common.states.datanode
+    .RunningDatanodeState;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState.states
+    .noContainerReports;
+import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
+
+/**
+ * Current Context of State Machine.
+ */
+public class StateContext {
+  static final Logger LOG =
+      LoggerFactory.getLogger(StateContext.class);
+  private final Queue<SCMCommand> commandQueue;
+  private final Lock lock;
+  private final DatanodeStateMachine parent;
+  private final AtomicLong stateExecutionCount;
+  private final Configuration conf;
+  private DatanodeStateMachine.DatanodeStates state;
+  private SCMNodeReport nrState;
+  private ReportState  reportState;
+  private static final ReportState DEFAULT_REPORT_STATE =
+      
ReportState.newBuilder().setState(noContainerReports).setCount(0).build();
+
+  /**
+   * Constructs a StateContext.
+   *
+   * @param conf   - Configration
+   * @param state  - State
+   * @param parent Parent State Machine
+   */
+  public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
+      state, DatanodeStateMachine parent) {
+    this.conf = conf;
+    this.state = state;
+    this.parent = parent;
+    commandQueue = new LinkedList<>();
+    lock = new ReentrantLock();
+    stateExecutionCount = new AtomicLong(0);
+    nrState = SCMNodeReport.getDefaultInstance();
+  }
+
+  /**
+   * Returns the ContainerStateMachine class that holds this state.
+   *
+   * @return ContainerStateMachine.
+   */
+  public DatanodeStateMachine getParent() {
+    return parent;
+  }
+
+  /**
+   * Get the container server port.
+   * @return The container server port if available, return -1 if otherwise
+   */
+  public int getContainerPort() {
+    return parent == null ?
+        INVALID_PORT : parent.getContainer().getContainerServerPort();
+  }
+
+  /**
+   * Gets the Ratis Port.
+   * @return int , return -1 if not valid.
+   */
+  public int getRatisPort() {
+    return parent == null ?
+        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
+  }
+
+  /**
+   * Returns true if we are entering a new state.
+   *
+   * @return boolean
+   */
+  boolean isEntering() {
+    return stateExecutionCount.get() == 0;
+  }
+
+  /**
+   * Returns true if we are exiting from the current state.
+   *
+   * @param newState - newState.
+   * @return boolean
+   */
+  boolean isExiting(DatanodeStateMachine.DatanodeStates newState) {
+    boolean isExiting = state != newState && stateExecutionCount.get() > 0;
+    if(isExiting) {
+      stateExecutionCount.set(0);
+    }
+    return isExiting;
+  }
+
+  /**
+   * Returns the current state the machine is in.
+   *
+   * @return state.
+   */
+  public DatanodeStateMachine.DatanodeStates getState() {
+    return state;
+  }
+
+  /**
+   * Sets the current state of the machine.
+   *
+   * @param state state.
+   */
+  public void setState(DatanodeStateMachine.DatanodeStates state) {
+    this.state = state;
+  }
+
+  /**
+   * Returns the node report of the datanode state context.
+   * @return the node report.
+   */
+  public SCMNodeReport getNodeReport() {
+    return nrState;
+  }
+
+  /**
+   * Sets the storage location report of the datanode state context.
+   * @param nrReport - node report
+   */
+  public void setReportState(SCMNodeReport nrReport) {
+    this.nrState = nrReport;
+  }
+
+  /**
+   * Returns the next task to get executed by the datanode state machine.
+   * @return A callable that will be executed by the
+   * {@link DatanodeStateMachine}
+   */
+  @SuppressWarnings("unchecked")
+  public DatanodeState<DatanodeStateMachine.DatanodeStates> getTask() {
+    switch (this.state) {
+    case INIT:
+      return new InitDatanodeState(this.conf, parent.getConnectionManager(),
+          this);
+    case RUNNING:
+      return new RunningDatanodeState(this.conf, parent.getConnectionManager(),
+          this);
+    case SHUTDOWN:
+      return null;
+    default:
+      throw new IllegalArgumentException("Not Implemented yet.");
+    }
+  }
+
+  /**
+   * Executes the required state function.
+   *
+   * @param service - Executor Service
+   * @param time    - seconds to wait
+   * @param unit    - Seconds.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  public void execute(ExecutorService service, long time, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    stateExecutionCount.incrementAndGet();
+    DatanodeState<DatanodeStateMachine.DatanodeStates> task = getTask();
+    if (this.isEntering()) {
+      task.onEnter();
+    }
+    task.execute(service);
+    DatanodeStateMachine.DatanodeStates newState = task.await(time, unit);
+    if (this.state != newState) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Task {} executed, state transited from {} to {}",
+            task.getClass().getSimpleName(), this.state, newState);
+      }
+      if (isExiting(newState)) {
+        task.onExit();
+      }
+      this.clearReportState();
+      this.setState(newState);
+    }
+  }
+
+  /**
+   * Returns the next command or null if it is empty.
+   *
+   * @return SCMCommand or Null.
+   */
+  public SCMCommand getNextCommand() {
+    lock.lock();
+    try {
+      return commandQueue.poll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Adds a command to the State Machine queue.
+   *
+   * @param command - SCMCommand.
+   */
+  public void addCommand(SCMCommand command) {
+    lock.lock();
+    try {
+      commandQueue.add(command);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns the count of the Execution.
+   * @return long
+   */
+  public long getExecutionCount() {
+    return stateExecutionCount.get();
+  }
+
+
+  /**
+   * Gets the ReportState.
+   * @return ReportState.
+   */
+  public synchronized  ReportState getContainerReportState() {
+    if (reportState == null) {
+      return DEFAULT_REPORT_STATE;
+    }
+    return reportState;
+  }
+
+  /**
+   * Sets the ReportState.
+   * @param rState - ReportState.
+   */
+  public synchronized  void setContainerReportState(ReportState rState) {
+    this.reportState = rState;
+  }
+
+  /**
+   * Clears report state after it has been communicated.
+   */
+  public synchronized void clearReportState() {
+    if(reportState != null) {
+      setContainerReportState(null);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
new file mode 100644
index 0000000..ac95b2a
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -0,0 +1,239 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.background;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .StorageContainerException;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
+
+/**
+ * A per-datanode container block deleting service takes in charge
+ * of deleting staled ozone blocks.
+ */
+public class BlockDeletingService extends BackgroundService{
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockDeletingService.class);
+
+  private final ContainerManager containerManager;
+  private final Configuration conf;
+
+  // Throttle number of blocks to delete per task,
+  // set to 1 for testing
+  private final int blockLimitPerTask;
+
+  // Throttle the number of containers to process concurrently at a time,
+  private final int containerLimitPerInterval;
+
+  // Task priority is useful when a to-delete block has weight.
+  private final static int TASK_PRIORITY_DEFAULT = 1;
+  // Core pool size for container tasks
+  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
+
+  public BlockDeletingService(ContainerManager containerManager,
+      long serviceInterval, long serviceTimeout, Configuration conf) {
+    super("BlockDeletingService", serviceInterval,
+        TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE,
+        serviceTimeout);
+    this.containerManager = containerManager;
+    this.conf = conf;
+    this.blockLimitPerTask = conf.getInt(
+        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
+        OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
+    this.containerLimitPerInterval = conf.getInt(
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    List<ContainerData> containers = Lists.newArrayList();
+    try {
+      // We at most list a number of containers a time,
+      // in case there are too many containers and start too many workers.
+      // We must ensure there is no empty container in this result.
+      // The chosen result depends on what container deletion policy is
+      // configured.
+      containers = containerManager.chooseContainerForBlockDeletion(
+          containerLimitPerInterval);
+      LOG.info("Plan to choose {} containers for block deletion, "
+          + "actually returns {} valid containers.",
+          containerLimitPerInterval, containers.size());
+
+      for(ContainerData container : containers) {
+        BlockDeletingTask containerTask =
+            new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
+        queue.add(containerTask);
+      }
+    } catch (StorageContainerException e) {
+      LOG.warn("Failed to initiate block deleting tasks, "
+          + "caused by unable to get containers info. "
+          + "Retry in next interval. ", e);
+    } catch (Exception e) {
+      // In case listContainer call throws any uncaught RuntimeException.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Unexpected error occurs during deleting blocks.", e);
+      }
+    }
+    return queue;
+  }
+
+  private static class ContainerBackgroundTaskResult
+      implements BackgroundTaskResult {
+    private List<String> deletedBlockIds;
+
+    ContainerBackgroundTaskResult() {
+      deletedBlockIds = new LinkedList<>();
+    }
+
+    public void addBlockId(String blockId) {
+      deletedBlockIds.add(blockId);
+    }
+
+    public void addAll(List<String> blockIds) {
+      deletedBlockIds.addAll(blockIds);
+    }
+
+    public List<String> getDeletedBlocks() {
+      return deletedBlockIds;
+    }
+
+    @Override
+    public int getSize() {
+      return deletedBlockIds.size();
+    }
+  }
+
+  private class BlockDeletingTask
+      implements BackgroundTask<BackgroundTaskResult> {
+
+    private final int priority;
+    private final ContainerData containerData;
+
+    BlockDeletingTask(ContainerData containerName, int priority) {
+      this.priority = priority;
+      this.containerData = containerName;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      long startTime = Time.monotonicNow();
+      // Scan container's db and get list of under deletion blocks
+      MetadataStore meta = KeyUtils.getDB(containerData, conf);
+      // # of blocks to delete is throttled
+      KeyPrefixFilter filter = new KeyPrefixFilter(
+          OzoneConsts.DELETING_KEY_PREFIX);
+      List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
+          meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
+      if (toDeleteBlocks.isEmpty()) {
+        LOG.debug("No under deletion block found in container : {}",
+            containerData.getContainerName());
+      }
+
+      List<String> succeedBlocks = new LinkedList<>();
+      LOG.debug("Container : {}, To-Delete blocks : {}",
+          containerData.getContainerName(), toDeleteBlocks.size());
+      File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
+      if (!dataDir.exists() || !dataDir.isDirectory()) {
+        LOG.error("Invalid container data dir {} : "
+            + "not exist or not a directory", dataDir.getAbsolutePath());
+        return crr;
+      }
+
+      toDeleteBlocks.forEach(entry -> {
+        String blockName = DFSUtil.bytes2String(entry.getKey());
+        LOG.debug("Deleting block {}", blockName);
+        try {
+          ContainerProtos.KeyData data =
+              ContainerProtos.KeyData.parseFrom(entry.getValue());
+          for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
+            File chunkFile = dataDir.toPath()
+                .resolve(chunkInfo.getChunkName()).toFile();
+            if (FileUtils.deleteQuietly(chunkFile)) {
+              LOG.debug("block {} chunk {} deleted", blockName,
+                  chunkFile.getAbsolutePath());
+            }
+          }
+          succeedBlocks.add(blockName);
+        } catch (InvalidProtocolBufferException e) {
+          LOG.error("Failed to parse block info for block {}", blockName, e);
+        }
+      });
+
+      // Once files are deleted ... clean up DB
+      BatchOperation batch = new BatchOperation();
+      succeedBlocks.forEach(entry ->
+          batch.delete(DFSUtil.string2Bytes(entry)));
+      meta.writeBatch(batch);
+      // update count of pending deletion blocks in in-memory container status
+      containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),
+          containerData.getContainerName());
+
+      if (!succeedBlocks.isEmpty()) {
+        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+            containerData.getContainerName(), succeedBlocks.size(),
+            Time.monotonicNow() - startTime);
+      }
+      crr.addAll(succeedBlocks);
+      return crr;
+    }
+
+    @Override
+    public int getPriority() {
+      return priority;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
new file mode 100644
index 0000000..a9e202e
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.background;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
new file mode 100644
index 0000000..f7b49b7
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCloseContainerCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Container Report handler.
+ */
+public class CloseContainerHandler implements CommandHandler {
+  static final Logger LOG =
+      LoggerFactory.getLogger(CloseContainerHandler.class);
+  private int invocationCount;
+  private long totalTime;
+
+  /**
+   * Constructs a ContainerReport handler.
+   */
+  public CloseContainerHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param container         - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    LOG.debug("Processing Close Container command.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+    String containerName = "UNKNOWN";
+    try {
+
+      SCMCloseContainerCmdResponseProto
+          closeContainerProto =
+          SCMCloseContainerCmdResponseProto
+              .parseFrom(command.getProtoBufMessage());
+      containerName = closeContainerProto.getContainerName();
+
+      container.getContainerManager().closeContainer(containerName);
+
+    } catch (Exception e) {
+      LOG.error("Can't close container " + containerName, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCmdType getCommandType() {
+    return SCMCmdType.closeContainerCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return invocationCount;
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
new file mode 100644
index 0000000..40feca3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.common.base.Preconditions;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Dispatches command to the correct handler.
+ */
+public final class CommandDispatcher {
+  static final Logger LOG =
+      LoggerFactory.getLogger(CommandDispatcher.class);
+  private final StateContext context;
+  private final Map<SCMCmdType, CommandHandler> handlerMap;
+  private final OzoneContainer container;
+  private final SCMConnectionManager connectionManager;
+
+  /**
+   * Constructs a command Dispatcher.
+   * @param context - Context.
+   */
+  /**
+   * Constructs a command dispatcher.
+   *
+   * @param container - Ozone Container
+   * @param context - Context
+   * @param handlers - Set of handlers.
+   */
+  private CommandDispatcher(OzoneContainer container, SCMConnectionManager
+      connectionManager, StateContext context,
+      CommandHandler... handlers) {
+    Preconditions.checkNotNull(context);
+    Preconditions.checkNotNull(handlers);
+    Preconditions.checkArgument(handlers.length > 0);
+    Preconditions.checkNotNull(container);
+    Preconditions.checkNotNull(connectionManager);
+    this.context = context;
+    this.container = container;
+    this.connectionManager = connectionManager;
+    handlerMap = new HashMap<>();
+    for (CommandHandler h : handlers) {
+      if(handlerMap.containsKey(h.getCommandType())){
+        LOG.error("Duplicate handler for the same command. Exiting. Handle " +
+            "key : { }", h.getCommandType().getDescriptorForType().getName());
+        throw new IllegalArgumentException("Duplicate handler for the same " +
+            "command.");
+      }
+      handlerMap.put(h.getCommandType(), h);
+    }
+  }
+
+  /**
+   * Dispatch the command to the correct handler.
+   *
+   * @param command - SCM Command.
+   */
+  public void handle(SCMCommand command) {
+    Preconditions.checkNotNull(command);
+    CommandHandler handler = handlerMap.get(command.getType());
+    if (handler != null) {
+      handler.handle(command, container, context, connectionManager);
+    } else {
+      LOG.error("Unknown SCM Command queued. There is no handler for this " +
+          "command. Command: {}", command.getType().getDescriptorForType()
+          .getName());
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Helper class to construct command dispatcher.
+   */
+  public static class Builder {
+    private final List<CommandHandler> handlerList;
+    private OzoneContainer container;
+    private StateContext context;
+    private SCMConnectionManager connectionManager;
+
+    public Builder() {
+      handlerList = new LinkedList<>();
+    }
+
+    /**
+     * Adds a handler.
+     *
+     * @param handler - handler
+     * @return Builder
+     */
+    public Builder addHandler(CommandHandler handler) {
+      Preconditions.checkNotNull(handler);
+      handlerList.add(handler);
+      return this;
+    }
+
+    /**
+     * Add the OzoneContainer.
+     *
+     * @param ozoneContainer - ozone container.
+     * @return Builder
+     */
+    public Builder setContainer(OzoneContainer ozoneContainer) {
+      Preconditions.checkNotNull(ozoneContainer);
+      this.container = ozoneContainer;
+      return this;
+    }
+
+    /**
+     * Set the Connection Manager.
+     *
+     * @param scmConnectionManager
+     * @return this
+     */
+    public Builder setConnectionManager(SCMConnectionManager
+        scmConnectionManager) {
+      Preconditions.checkNotNull(scmConnectionManager);
+      this.connectionManager = scmConnectionManager;
+      return this;
+    }
+
+    /**
+     * Sets the Context.
+     *
+     * @param stateContext - StateContext
+     * @return this
+     */
+    public Builder setContext(StateContext stateContext) {
+      Preconditions.checkNotNull(stateContext);
+      this.context = stateContext;
+      return this;
+    }
+
+    /**
+     * Builds a command Dispatcher.
+     * @return Command Dispatcher.
+     */
+    public CommandDispatcher build() {
+      Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
+          " manager.");
+      Preconditions.checkNotNull(this.container, "Missing container.");
+      Preconditions.checkNotNull(this.context, "Missing context.");
+      Preconditions.checkArgument(this.handlerList.size() > 0);
+      return new CommandDispatcher(this.container, this.connectionManager,
+          this.context, handlerList.toArray(
+              new CommandHandler[handlerList.size()]));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
new file mode 100644
index 0000000..13d9f72
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+
+/**
+ * Generic interface for handlers.
+ */
+public interface CommandHandler {
+
+  /**
+   * Handles a given SCM command.
+   * @param command - SCM Command
+   * @param container - Ozone Container.
+   * @param context - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager);
+
+  /**
+   * Returns the command type that this command handler handles.
+   * @return Type
+   */
+  SCMCmdType getCommandType();
+
+  /**
+   * Returns number of times this handler has been invoked.
+   * @return int
+   */
+  int getInvocationCount();
+
+  /**
+   * Returns the average time this function takes to run.
+   * @return  long
+   */
+  long getAverageRunTime();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
new file mode 100644
index 0000000..ba6b418
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ContainerReportHandler.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Container Report handler.
+ */
+public class ContainerReportHandler implements CommandHandler {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReportHandler.class);
+  private int invocationCount;
+  private long totalTime;
+
+  /**
+   * Constructs a ContainerReport handler.
+   */
+  public ContainerReportHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command - SCM Command
+   * @param container - Ozone Container.
+   * @param context - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    LOG.debug("Processing Container Report.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+    try {
+      ContainerReportsRequestProto contianerReport =
+          container.getContainerReport();
+
+      // TODO : We send this report to all SCMs.Check if it is enough only to
+      // send to the leader once we have RAFT enabled SCMs.
+      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+        endPoint.getEndPoint().sendContainerReport(contianerReport);
+      }
+    } catch (IOException ex) {
+      LOG.error("Unable to process the Container Report command.", ex);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCmdType getCommandType() {
+    return SCMCmdType.sendContainerReport;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return invocationCount;
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
new file mode 100644
index 0000000..f106e3d
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+    .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers
+    .DeletedContainerBlocksSummary;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handle block deletion commands.
+ */
+public class DeleteBlocksCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
+
+  private ContainerManager containerManager;
+  private Configuration conf;
+  private int invocationCount;
+  private long totalTime;
+
+  public DeleteBlocksCommandHandler(ContainerManager containerManager,
+      Configuration conf) {
+    this.containerManager = containerManager;
+    this.conf = conf;
+  }
+
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    if (command.getType() != SCMCmdType.deleteBlocksCommand) {
+      LOG.warn("Skipping handling command, expected command "
+              + "type {} but found {}",
+          SCMCmdType.deleteBlocksCommand, command.getType());
+      return;
+    }
+    LOG.debug("Processing block deletion command.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+
+    // move blocks to deleting state.
+    // this is a metadata update, the actual deletion happens in another
+    // recycling thread.
+    DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
+    List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
+
+
+    DeletedContainerBlocksSummary summary =
+        DeletedContainerBlocksSummary.getFrom(containerBlocks);
+    LOG.info("Start to delete container blocks, TXIDs={}, "
+            + "numOfContainers={}, numOfBlocks={}",
+        summary.getTxIDSummary(),
+        summary.getNumOfContainers(),
+        summary.getNumOfBlocks());
+
+    ContainerBlocksDeletionACKProto.Builder resultBuilder =
+        ContainerBlocksDeletionACKProto.newBuilder();
+    containerBlocks.forEach(entry -> {
+      DeleteBlockTransactionResult.Builder txResultBuilder =
+          DeleteBlockTransactionResult.newBuilder();
+      txResultBuilder.setTxID(entry.getTxID());
+      try {
+        deleteContainerBlocks(entry, conf);
+        txResultBuilder.setSuccess(true);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete blocks for container={}, TXID={}",
+            entry.getContainerName(), entry.getTxID(), e);
+        txResultBuilder.setSuccess(false);
+      }
+      resultBuilder.addResults(txResultBuilder.build());
+    });
+    ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+
+    // Send ACK back to SCM as long as meta updated
+    // TODO Or we should wait until the blocks are actually deleted?
+    if (!containerBlocks.isEmpty()) {
+      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending following block deletion ACK to SCM");
+            for (DeleteBlockTransactionResult result :
+                blockDeletionACK.getResultsList()) {
+              LOG.debug(result.getTxID() + " : " + result.getSuccess());
+            }
+          }
+          endPoint.getEndPoint()
+              .sendContainerBlocksDeletionACK(blockDeletionACK);
+        } catch (IOException e) {
+          LOG.error("Unable to send block deletion ACK to SCM {}",
+              endPoint.getAddress().toString(), e);
+        }
+      }
+    }
+
+    long endTime = Time.monotonicNow();
+    totalTime += endTime - startTime;
+  }
+
+  /**
+   * Move a bunch of blocks from a container to deleting state.
+   * This is a meta update, the actual deletes happen in async mode.
+   *
+   * @param delTX a block deletion transaction.
+   * @param config configuration.
+   * @throws IOException if I/O error occurs.
+   */
+  private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
+      Configuration config) throws IOException {
+    String containerId = delTX.getContainerName();
+    ContainerData containerInfo = containerManager.readContainer(containerId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing Container : {}, DB path : {}", containerId,
+          containerInfo.getDBPath());
+    }
+
+    int newDeletionBlocks = 0;
+    MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
+    for (String blk : delTX.getBlockIDList()) {
+      BatchOperation batch = new BatchOperation();
+      byte[] blkBytes = DFSUtil.string2Bytes(blk);
+      byte[] blkInfo = containerDB.get(blkBytes);
+      if (blkInfo != null) {
+        // Found the block in container db,
+        // use an atomic update to change its state to deleting.
+        batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
+            blkInfo);
+        batch.delete(blkBytes);
+        try {
+          containerDB.writeBatch(batch);
+          newDeletionBlocks++;
+          LOG.debug("Transited Block {} to DELETING state in container {}",
+              blk, containerId);
+        } catch (IOException e) {
+          // if some blocks failed to delete, we fail this TX,
+          // without sending this ACK to SCM, SCM will resend the TX
+          // with a certain number of retries.
+          throw new IOException(
+              "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
+        }
+      } else {
+        LOG.debug("Block {} not found or already under deletion in"
+                + " container {}, skip deleting it.", blk, containerId);
+      }
+    }
+
+    // update pending deletion blocks count in in-memory container status
+    containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId);
+  }
+
+  @Override
+  public SCMCmdType getCommandType() {
+    return SCMCmdType.deleteBlocksCommand;
+  }
+
+  @Override
+  public int getInvocationCount() {
+    return this.invocationCount;
+  }
+
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
new file mode 100644
index 0000000..1e9c8dc
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
new file mode 100644
index 0000000..feb2f81
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine;
+/**
+
+ State machine class is used by the container to denote various states a
+ container can be in and also is used for command processing.
+
+ Container has the following states.
+
+ Start - > getVersion -> Register -> Running  -> Shutdown
+
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
new file mode 100644
index 0000000..75142af
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/DatanodeState.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * State Interface that allows tasks to maintain states.
+ */
+public interface DatanodeState<T> {
+  /**
+   * Called before entering this state.
+   */
+  void onEnter();
+
+  /**
+   * Called After exiting this state.
+   */
+  void onExit();
+
+  /**
+   * Executes one or more tasks that is needed by this state.
+   *
+   * @param executor -  ExecutorService
+   */
+  void execute(ExecutorService executor);
+
+  /**
+   * Wait for execute to finish.
+   *
+   * @param time - Time
+   * @param timeUnit - Unit of time.
+   */
+  T await(long time, TimeUnit timeUnit)
+      throws InterruptedException, ExecutionException, TimeoutException;
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to