This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 1f3ef369755a4d2363506cef545c88c144f14fc7
Author: Nandakumar <na...@apache.org>
AuthorDate: Tue May 26 13:26:16 2020 +0530

    HDDS-3186. Introduce generic SCMRatisRequest and SCMRatisResponse. (#959)
    
    * HDDS-3186. Initial version.
    
    * HDDS-3186. Additional changes.
---
 .../server-scm/dev-support/findbugsExcludeFile.xml |  21 ++
 hadoop-hdds/server-scm/pom.xml                     |  38 ++
 .../hdds/scm/container/ContainerManagerImpl.java   | 282 +++++++++++++++
 .../hdds/scm/container/ContainerManagerV2.java     | 189 ++++++++++
 .../scm/container/ContainerStateManagerImpl.java   | 397 +++++++++++++++++++++
 .../scm/container/ContainerStateManagerV2.java     |  75 ++++
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   | 155 ++++++++
 .../apache/hadoop/hdds/scm/ha/ReflectionUtil.java  |  67 ++++
 .../hadoop/hdds/scm/ha/SCMHAConfiguration.java     | 225 ++++++++++++
 .../hadoop/hdds/scm/ha/SCMHAInvocationHandler.java |  93 +++++
 .../apache/hadoop/hdds/scm/ha/SCMHAManager.java    |  76 ++++
 .../apache/hadoop/hdds/scm/ha/SCMRatisRequest.java | 147 ++++++++
 .../hadoop/hdds/scm/ha/SCMRatisResponse.java       | 127 +++++++
 .../apache/hadoop/hdds/scm/ha/SCMRatisServer.java  | 109 ++++++
 .../apache/hadoop/hdds/scm/ha/SCMStateMachine.java |  92 +++++
 .../apache/hadoop/hdds/scm/metadata/Replicate.java |  33 ++
 .../src/main/proto/SCMRatisProtocol.proto          |  46 +++
 17 files changed, 2172 insertions(+)

diff --git a/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml 
b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
new file mode 100644
index 0000000..3571a89
--- /dev/null
+++ b/hadoop-hdds/server-scm/dev-support/findbugsExcludeFile.xml
@@ -0,0 +1,21 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<FindBugsFilter>
+  <Match>
+    <Package name="org.apache.hadoop.hdds.protocol.proto"/>
+  </Match>
+</FindBugsFilter>
diff --git a/hadoop-hdds/server-scm/pom.xml b/hadoop-hdds/server-scm/pom.xml
index dcbc42a..8c17aae 100644
--- a/hadoop-hdds/server-scm/pom.xml
+++ b/hadoop-hdds/server-scm/pom.xml
@@ -128,6 +128,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <artifactId>hadoop-hdds-hadoop-dependency-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
@@ -163,6 +168,39 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd";>
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <imports>
+                <param>
+                  ${basedir}/src/main/proto
+                </param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>SCMRatisProtocol.proto</include>
+                </includes>
+              </source>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+        <configuration>
+          
<excludeFilterFile>${basedir}/dev-support/findbugsExcludeFile.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
     </plugins>
     <testResources>
       <testResource>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
new file mode 100644
index 0000000..0404530
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO: Add javadoc.
+ */
+public class ContainerManagerImpl implements ContainerManagerV2 {
+
+  /**
+   *
+   */
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ContainerManagerImpl.class);
+
+  /**
+   *
+   */
+  private final ReadWriteLock lock;
+
+  /**
+   *
+   */
+  private final PipelineManager pipelineManager;
+
+  /**
+   *
+   */
+  private final ContainerStateManagerV2 containerStateManager;
+
+  /**
+   *
+   */
+  public ContainerManagerImpl(
+      // Introduce builder for this class?
+      final Configuration conf, final PipelineManager pipelineManager,
+      final SCMHAManager scmhaManager,
+      final Table<ContainerID, ContainerInfo> containerStore)
+      throws IOException {
+    this.lock = new ReentrantReadWriteLock();
+    this.pipelineManager = pipelineManager;
+    this.containerStateManager =  ContainerStateManagerImpl.newBuilder()
+        .setConfiguration(conf)
+        .setPipelineManager(pipelineManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setContainerStore(containerStore)
+        .build();
+  }
+
+  @Override
+  public Set<ContainerID> getContainerIDs() {
+    lock.readLock().lock();
+    try {
+      return containerStateManager.getContainerIDs();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public Set<ContainerInfo> getContainers() {
+    lock.readLock().lock();
+    try {
+      return containerStateManager.getContainerIDs().stream().map(id -> {
+        try {
+          return containerStateManager.getContainer(id);
+        } catch (ContainerNotFoundException e) {
+          // How can this happen? o_O
+          return null;
+        }
+      }).filter(Objects::nonNull).collect(Collectors.toSet());
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public ContainerInfo getContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    lock.readLock().lock();
+    try {
+      return containerStateManager.getContainer(containerID);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public Set<ContainerInfo> getContainers(final LifeCycleState state) {
+    lock.readLock().lock();
+    try {
+      return containerStateManager.getContainerIDs(state).stream().map(id -> {
+        try {
+          return containerStateManager.getContainer(id);
+        } catch (ContainerNotFoundException e) {
+          // How can this happen? o_O
+          return null;
+        }
+      }).filter(Objects::nonNull).collect(Collectors.toSet());
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean exists(final ContainerID containerID) {
+    lock.readLock().lock();
+    try {
+      return (containerStateManager.getContainer(containerID) != null);
+    } catch (ContainerNotFoundException ex) {
+      return false;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<ContainerInfo> listContainers(final ContainerID startID,
+                                            final int count) {
+    lock.readLock().lock();
+    try {
+      final long startId = startID == null ? 0 : startID.getId();
+      final List<ContainerID> containersIds =
+          new ArrayList<>(containerStateManager.getContainerIDs());
+      Collections.sort(containersIds);
+      return containersIds.stream()
+          .filter(id -> id.getId() > startId)
+          .limit(count)
+          .map(id -> {
+            try {
+              return containerStateManager.getContainer(id);
+            } catch (ContainerNotFoundException ex) {
+              // This can never happen, as we hold lock no one else can remove
+              // the container after we got the container ids.
+              LOG.warn("Container Missing.", ex);
+              return null;
+            }
+          }).collect(Collectors.toList());
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public ContainerInfo allocateContainer(final ReplicationType type,
+      final ReplicationFactor replicationFactor, final String owner)
+      throws IOException {
+    lock.writeLock().lock();
+    try {
+      final List<Pipeline> pipelines = pipelineManager
+          .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
+
+      if (pipelines.isEmpty()) {
+        throw new IOException("Could not allocate container. Cannot get any" +
+            " matching pipeline for Type:" + type + ", Factor:" +
+            replicationFactor + ", State:PipelineState.OPEN");
+      }
+
+      final ContainerID containerID = containerStateManager
+          .getNextContainerID();
+      final Pipeline pipeline = pipelines.get(
+          (int) containerID.getId() % pipelines.size());
+
+      final ContainerInfoProto containerInfo = ContainerInfoProto.newBuilder()
+          .setState(LifeCycleState.OPEN)
+          .setPipelineID(pipeline.getId().getProtobuf())
+          .setUsedBytes(0)
+          .setNumberOfKeys(0)
+          .setStateEnterTime(Time.now())
+          .setOwner(owner)
+          .setContainerID(containerID.getId())
+          .setDeleteTransactionId(0)
+          .setReplicationFactor(pipeline.getFactor())
+          .setReplicationType(pipeline.getType())
+          .build();
+      containerStateManager.addContainer(containerInfo);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("New container allocated: {}", containerInfo);
+      }
+      return containerStateManager.getContainer(containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void deleteContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public void updateContainerState(final ContainerID containerID,
+                                   final LifeCycleEvent event)
+      throws ContainerNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public Set<ContainerReplica> getContainerReplicas(
+      final ContainerID containerID) throws ContainerNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public void updateContainerReplica(final ContainerID containerID,
+                                     final ContainerReplica replica)
+      throws ContainerNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public void removeContainerReplica(final ContainerID containerID,
+                                     final ContainerReplica replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public void updateDeleteTransactionId(
+      final Map<ContainerID, Long> deleteTransactionMap) throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public ContainerInfo getMatchingContainer(final long size, final String 
owner,
+      final Pipeline pipeline, final List<ContainerID> excludedContainerIDS) {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public void notifyContainerReportProcessing(final boolean isFullReport,
+                                              final boolean success) {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  @Override
+  public void close() throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
new file mode 100644
index 0000000..37c7b70
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerV2.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
+/**
+ * TODO: Add extensive javadoc.
+ *
+ * ContainerManager class contains the mapping from a name to a pipeline
+ * mapping. This is used by SCM when allocating new locations and when
+ * looking up a key.
+ */
+public interface ContainerManagerV2 extends Closeable {
+
+
+  /**
+   * Returns all the container Ids managed by ContainerManager.
+   *
+   * @return Set of ContainerID
+   */
+  Set<ContainerID> getContainerIDs();
+
+  /**
+   * Returns all the containers managed by ContainerManager.
+   *
+   * @return List of ContainerInfo
+   */
+  Set<ContainerInfo> getContainers();
+
+  /**
+   * Returns all the containers which are in the specified state.
+   *
+   * @return List of ContainerInfo
+   */
+  Set<ContainerInfo> getContainers(LifeCycleState state);
+
+  /**
+   * Returns the ContainerInfo from the container ID.
+   *
+   */
+  ContainerInfo getContainer(ContainerID containerID)
+      throws ContainerNotFoundException;
+
+  boolean exists(ContainerID containerID);
+
+  /**
+   * Returns containers under certain conditions.
+   * Search container IDs from start ID(exclusive),
+   * The max size of the searching range cannot exceed the
+   * value of count.
+   *
+   * @param startID start containerID, >=0,
+   * start searching at the head if 0.
+   * @param count count must be >= 0
+   *              Usually the count will be replace with a very big
+   *              value instead of being unlimited in case the db is very big.
+   *
+   * @return a list of container.
+   */
+  List<ContainerInfo> listContainers(ContainerID startID, int count);
+
+  /**
+   * Allocates a new container for a given keyName and replication factor.
+   *
+   * @param replicationFactor - replication factor of the container.
+   * @param owner
+   * @return - ContainerInfo.
+   * @throws IOException
+   */
+  ContainerInfo allocateContainer(ReplicationType type,
+                                  ReplicationFactor replicationFactor,
+                                  String owner) throws IOException;
+
+  /**
+   * Deletes a container from SCM.
+   *
+   * @param containerID - Container ID
+   * @throws IOException
+   */
+  void deleteContainer(ContainerID containerID)
+      throws ContainerNotFoundException;
+
+  /**
+   * Update container state.
+   * @param containerID - Container ID
+   * @param event - container life cycle event
+   * @throws IOException
+   */
+  void updateContainerState(ContainerID containerID,
+                            LifeCycleEvent event)
+      throws ContainerNotFoundException;
+
+  /**
+   * Returns the latest list of replicas for given containerId.
+   *
+   * @param containerID Container ID
+   * @return Set of ContainerReplica
+   */
+  Set<ContainerReplica> getContainerReplicas(ContainerID containerID)
+      throws ContainerNotFoundException;
+
+  /**
+   * Adds a container Replica for the given Container.
+   *
+   * @param containerID Container ID
+   * @param replica ContainerReplica
+   */
+  void updateContainerReplica(ContainerID containerID, ContainerReplica 
replica)
+      throws ContainerNotFoundException;
+
+  /**
+   * Remove a container Replica form a given Container.
+   *
+   * @param containerID Container ID
+   * @param replica ContainerReplica
+   * @return True of dataNode is removed successfully else false.
+   */
+  void removeContainerReplica(ContainerID containerID, ContainerReplica 
replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException;
+
+  /**
+   * Update deleteTransactionId according to deleteTransactionMap.
+   *
+   * @param deleteTransactionMap Maps the containerId to latest delete
+   *                             transaction id for the container.
+   * @throws IOException
+   */
+  void updateDeleteTransactionId(Map<ContainerID, Long> deleteTransactionMap)
+      throws IOException;
+
+  /**
+   * Returns ContainerInfo which matches the requirements.
+   * @param size - the amount of space required in the container
+   * @param owner - the user which requires space in its owned container
+   * @param pipeline - pipeline to which the container should belong
+   * @return ContainerInfo for the matching container.
+   */
+  default ContainerInfo getMatchingContainer(long size, String owner,
+                                     Pipeline pipeline) {
+    return getMatchingContainer(size, owner, pipeline, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns ContainerInfo which matches the requirements.
+   * @param size - the amount of space required in the container
+   * @param owner - the user which requires space in its owned container
+   * @param pipeline - pipeline to which the container should belong.
+   * @param excludedContainerIDS - containerIds to be excluded.
+   * @return ContainerInfo for the matching container.
+   */
+  ContainerInfo getMatchingContainer(long size, String owner,
+                                     Pipeline pipeline,
+                                     List<ContainerID> excludedContainerIDS);
+
+  /**
+   * Once after report processor handler completes, call this to notify
+   * container manager to increment metrics.
+   * @param isFullReport
+   * @param success
+   */
+  // Is it possible to remove this from the Interface?
+  void notifyContainerReportProcessing(boolean isFullReport, boolean success);
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
new file mode 100644
index 0000000..16fe340
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerImpl.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import java.io.IOException;
+
+import java.lang.reflect.Proxy;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.states.ContainerState;
+import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+
+/**
+ * TODO: Add javadoc.
+ */
+public final class ContainerStateManagerImpl
+    implements ContainerStateManagerV2 {
+
+  /* **********************************************************************
+   * Container Life Cycle                                                 *
+   *                                                                      *
+   * Event and State Transition Mapping:                                  *
+   *                                                                      *
+   * State: OPEN         ----------------> CLOSING                        *
+   * Event:                    FINALIZE                                   *
+   *                                                                      *
+   * State: CLOSING      ----------------> QUASI_CLOSED                   *
+   * Event:                  QUASI_CLOSE                                  *
+   *                                                                      *
+   * State: CLOSING      ----------------> CLOSED                         *
+   * Event:                     CLOSE                                     *
+   *                                                                      *
+   * State: QUASI_CLOSED ----------------> CLOSED                         *
+   * Event:                  FORCE_CLOSE                                  *
+   *                                                                      *
+   * State: CLOSED       ----------------> DELETING                       *
+   * Event:                    DELETE                                     *
+   *                                                                      *
+   * State: DELETING     ----------------> DELETED                        *
+   * Event:                    CLEANUP                                    *
+   *                                                                      *
+   *                                                                      *
+   * Container State Flow:                                                *
+   *                                                                      *
+   * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED]        *
+   *          (FINALIZE)      |      (QUASI_CLOSE)        |               *
+   *                          |                           |               *
+   *                          |                           |               *
+   *                  (CLOSE) |             (FORCE_CLOSE) |               *
+   *                          |                           |               *
+   *                          |                           |               *
+   *                          +--------->[CLOSED]<--------+               *
+   *                                        |                             *
+   *                                (DELETE)|                             *
+   *                                        |                             *
+   *                                        |                             *
+   *                                   [DELETING]                         *
+   *                                        |                             *
+   *                              (CLEANUP) |                             *
+   *                                        |                             *
+   *                                        V                             *
+   *                                    [DELETED]                         *
+   *                                                                      *
+   ************************************************************************/
+
+  /**
+   *
+   */
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ContainerStateManagerImpl.class);
+
+  /**
+   *
+   */
+  private final long containerSize;
+
+  /**
+   *
+   */
+  private final AtomicLong nextContainerID;
+
+  /**
+   *
+   */
+  private final ContainerStateMap containers;
+
+  /**
+   *
+   */
+  private final PipelineManager pipelineManager;
+
+  /**
+   *
+   */
+  private Table<ContainerID, ContainerInfo> containerStore;
+
+  /**
+   *
+   */
+  private final StateMachine<LifeCycleState, LifeCycleEvent> stateMachine;
+
+  /**
+   *
+   */
+  private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
+
+  /**
+   *
+   */
+  private ContainerStateManagerImpl(final Configuration conf,
+      final PipelineManager pipelineManager,
+      final Table<ContainerID, ContainerInfo> containerStore)
+      throws IOException {
+    this.pipelineManager = pipelineManager;
+    this.containerStore = containerStore;
+    this.stateMachine = newStateMachine();
+    this.containerSize = getConfiguredContainerSize(conf);
+    this.nextContainerID = new AtomicLong();
+    this.containers = new ContainerStateMap();
+    this.lastUsedMap = new ConcurrentHashMap<>();
+
+    initialize();
+  }
+
+  /**
+   *
+   */
+  private StateMachine<LifeCycleState, LifeCycleEvent> newStateMachine() {
+
+    final Set<LifeCycleState> finalStates = new HashSet<>();
+
+    // These are the steady states of a container.
+    finalStates.add(LifeCycleState.OPEN);
+    finalStates.add(LifeCycleState.CLOSED);
+    finalStates.add(LifeCycleState.DELETED);
+
+    final StateMachine<LifeCycleState, LifeCycleEvent> containerLifecycleSM =
+        new StateMachine<>(LifeCycleState.OPEN, finalStates);
+
+    containerLifecycleSM.addTransition(LifeCycleState.OPEN,
+        LifeCycleState.CLOSING,
+        LifeCycleEvent.FINALIZE);
+
+    containerLifecycleSM.addTransition(LifeCycleState.CLOSING,
+        LifeCycleState.QUASI_CLOSED,
+        LifeCycleEvent.QUASI_CLOSE);
+
+    containerLifecycleSM.addTransition(LifeCycleState.CLOSING,
+        LifeCycleState.CLOSED,
+        LifeCycleEvent.CLOSE);
+
+    containerLifecycleSM.addTransition(LifeCycleState.QUASI_CLOSED,
+        LifeCycleState.CLOSED,
+        LifeCycleEvent.FORCE_CLOSE);
+
+    containerLifecycleSM.addTransition(LifeCycleState.CLOSED,
+        LifeCycleState.DELETING,
+        LifeCycleEvent.DELETE);
+
+    containerLifecycleSM.addTransition(LifeCycleState.DELETING,
+        LifeCycleState.DELETED,
+        LifeCycleEvent.CLEANUP);
+
+    return containerLifecycleSM;
+  }
+
+  /**
+   *
+   */
+  private long getConfiguredContainerSize(final Configuration conf) {
+    return (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
+        StorageUnit.BYTES);
+  }
+
+  /**
+   *
+   */
+  private void initialize() throws IOException {
+    TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
+        iterator = containerStore.iterator();
+
+    while (iterator.hasNext()) {
+      final ContainerInfo container = iterator.next().getValue();
+      Preconditions.checkNotNull(container);
+      containers.addContainer(container);
+      nextContainerID.set(Long.max(container.containerID().getId(),
+          nextContainerID.get()));
+      if (container.getState() == LifeCycleState.OPEN) {
+        try {
+          pipelineManager.addContainerToPipeline(container.getPipelineID(),
+              ContainerID.valueof(container.getContainerID()));
+        } catch (PipelineNotFoundException ex) {
+          LOG.warn("Found container {} which is in OPEN state with " +
+                  "pipeline {} that does not exist. Marking container for " +
+                  "closing.", container, container.getPipelineID());
+          updateContainerState(container.containerID(),
+              LifeCycleEvent.FINALIZE);
+        }
+      }
+    }
+  }
+
+  @Override
+  public ContainerID getNextContainerID() {
+    return ContainerID.valueof(nextContainerID.get());
+  }
+
+  @Override
+  public Set<ContainerID> getContainerIDs() {
+    return containers.getAllContainerIDs();
+  }
+
+  @Override
+  public Set<ContainerID> getContainerIDs(final LifeCycleState state) {
+    return containers.getContainerIDsByState(state);
+  }
+
+  @Override
+  public ContainerInfo getContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    return containers.getContainerInfo(containerID);
+  }
+
+  @Override
+  public Set<ContainerReplica> getContainerReplicas(
+      final ContainerID containerID) throws ContainerNotFoundException {
+    return containers.getContainerReplicas(containerID);
+  }
+
+  @Override
+  public void addContainer(final ContainerInfoProto containerInfo)
+      throws IOException {
+
+    // Change the exception thrown to PipelineNotFound and
+    // ClosedPipelineException once ClosedPipelineException is introduced
+    // in PipelineManager.
+
+    Preconditions.checkNotNull(containerInfo);
+    final ContainerInfo container = ContainerInfo.fromProtobuf(containerInfo);
+    if (getContainer(container.containerID()) == null) {
+      Preconditions.checkArgument(nextContainerID.get()
+              == container.containerID().getId(),
+          "ContainerID mismatch.");
+
+      pipelineManager.addContainerToPipeline(
+          container.getPipelineID(), container.containerID());
+      containers.addContainer(container);
+      nextContainerID.incrementAndGet();
+    }
+  }
+
+  void updateContainerState(final ContainerID containerID,
+                            final LifeCycleEvent event)
+      throws IOException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+
+  void updateContainerReplica(final ContainerID containerID,
+                              final ContainerReplica replica)
+      throws ContainerNotFoundException {
+    containers.updateContainerReplica(containerID, replica);
+  }
+
+
+  void updateDeleteTransactionId(
+      final Map<ContainerID, Long> deleteTransactionMap) {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  ContainerInfo getMatchingContainer(final long size, String owner,
+      PipelineID pipelineID, NavigableSet<ContainerID> containerIDs) {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+
+  NavigableSet<ContainerID> getMatchingContainerIDs(final String owner,
+      final ReplicationType type, final ReplicationFactor factor,
+      final LifeCycleState state) {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  void removeContainerReplica(final ContainerID containerID,
+                              final ContainerReplica replica)
+      throws ContainerNotFoundException, ContainerReplicaNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+
+  void removeContainer(final ContainerID containerID)
+      throws ContainerNotFoundException {
+    throw new UnsupportedOperationException("Not yet implemented!");
+  }
+
+  void close() throws IOException {
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for ContainerStateManager.
+   */
+  public static class Builder {
+    private Configuration conf;
+    private PipelineManager pipelineMgr;
+    private SCMRatisServer scmRatisServer;
+    private Table<ContainerID, ContainerInfo> table;
+
+    public Builder setConfiguration(final Configuration config) {
+      conf = config;
+      return this;
+    }
+
+    public Builder setPipelineManager(final PipelineManager pipelineManager) {
+      pipelineMgr = pipelineManager;
+      return this;
+    }
+
+    public Builder setRatisServer(final SCMRatisServer ratisServer) {
+      scmRatisServer = ratisServer;
+      return this;
+    }
+
+    public Builder setContainerStore(
+        final Table<ContainerID, ContainerInfo> containerStore) {
+      table = containerStore;
+      return this;
+    }
+
+    public ContainerStateManagerV2 build() throws IOException {
+      Preconditions.checkNotNull(conf);
+      Preconditions.checkNotNull(pipelineMgr);
+      Preconditions.checkNotNull(scmRatisServer);
+      Preconditions.checkNotNull(table);
+
+      final ContainerStateManagerV2 csm = new ContainerStateManagerImpl(
+          conf, pipelineMgr, table);
+      scmRatisServer.registerStateMachineHandler(RequestType.CONTAINER, csm);
+
+      final SCMHAInvocationHandler invocationHandler =
+          new SCMHAInvocationHandler(RequestType.CONTAINER, csm,
+              scmRatisServer);
+
+      return (ContainerStateManagerV2) Proxy.newProxyInstance(
+          SCMHAInvocationHandler.class.getClassLoader(),
+          new Class<?>[]{ContainerStateManagerV2.class}, invocationHandler);
+    }
+
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
new file mode 100644
index 0000000..9960354
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManagerV2.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+
+import java.io.IOException;
+import java.util.Set;
+
+/**
+ *
+ * TODO: Add proper javadoc.
+ *
+ * Implementation of methods marked with {@code @Replicate} annotation should 
be
+ *
+ * 1. Idempotent
+ * 2. Arguments should be of protobuf objects
+ * 3. Return type should be of protobuf object
+ * 4. The declaration should throw RaftException
+ *
+ */
+public interface ContainerStateManagerV2 {
+
+  /**
+   *
+   */
+  ContainerID getNextContainerID();
+
+  /**
+   *
+   */
+  Set<ContainerID> getContainerIDs();
+
+  /**
+   *
+   */
+  Set<ContainerID> getContainerIDs(LifeCycleState state);
+
+  /**
+   *
+   */
+  ContainerInfo getContainer(ContainerID containerID)
+      throws ContainerNotFoundException;
+
+  /**
+   *
+   */
+  Set<ContainerReplica> getContainerReplicas(ContainerID containerID)
+      throws ContainerNotFoundException;
+
+  /**
+   *
+   */
+  @Replicate
+  void addContainer(ContainerInfoProto containerInfo)
+      throws IOException;
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
new file mode 100644
index 0000000..1bc1697
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/RatisUtil.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ratis.server.RaftServerConfigKeys.Log;
+import static org.apache.ratis.server.RaftServerConfigKeys.RetryCache;
+import static org.apache.ratis.server.RaftServerConfigKeys.Rpc;
+import static org.apache.ratis.server.RaftServerConfigKeys.Snapshot;
+
+/**
+ * Ratis Util for SCM HA.
+ */
+public final class RatisUtil {
+
+  private RatisUtil() {
+  }
+
+
+  /**
+   * Constructs new Raft Properties instance using {@link SCMHAConfiguration}.
+   * @param haConf SCMHAConfiguration
+   * @param conf ConfigurationSource
+   */
+  public static RaftProperties newRaftProperties(
+      final SCMHAConfiguration haConf, final ConfigurationSource conf) {
+    //TODO: Remove ConfigurationSource!
+    // TODO: Check the default values.
+    final RaftProperties properties = new RaftProperties();
+    setRaftStorageDir(properties, haConf, conf);
+    setRaftRpcProperties(properties, haConf);
+    setRaftLogProperties(properties, haConf);
+    setRaftRetryCacheProperties(properties, haConf);
+    setRaftSnapshotProperties(properties, haConf);
+    return properties;
+  }
+
+  /**
+   * Set the local directory where ratis logs will be stored.
+   *
+   * @param properties RaftProperties instance which will be updated
+   * @param haConf SCMHAConfiguration
+   * @param conf ConfigurationSource
+   */
+  public static void setRaftStorageDir(final RaftProperties properties,
+                                       final SCMHAConfiguration haConf,
+                                       final ConfigurationSource conf) {
+    String storageDir = haConf.getRatisStorageDir();
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+    }
+    RaftServerConfigKeys.setStorageDir(properties,
+        Collections.singletonList(new File(storageDir)));
+  }
+
+  /**
+   * Set properties related to Raft RPC.
+   *
+   * @param properties RaftProperties instance which will be updated
+   * @param conf SCMHAConfiguration
+   */
+  private static void setRaftRpcProperties(final RaftProperties properties,
+                                           final SCMHAConfiguration conf) {
+    RaftConfigKeys.Rpc.setType(properties,
+        RpcType.valueOf(conf.getRatisRpcType()));
+    GrpcConfigKeys.Server.setPort(properties,
+        conf.getRatisBindAddress().getPort());
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf("32m"));
+
+    Rpc.setRequestTimeout(properties, TimeDuration.valueOf(
+        conf.getRatisRequestTimeout(), TimeUnit.MILLISECONDS));
+    Rpc.setTimeoutMin(properties, TimeDuration.valueOf(
+        conf.getRatisRequestMinTimeout(), TimeUnit.MILLISECONDS));
+    Rpc.setTimeoutMax(properties, TimeDuration.valueOf(
+        conf.getRatisRequestMaxTimeout(), TimeUnit.MILLISECONDS));
+    Rpc.setSlownessTimeout(properties, TimeDuration.valueOf(
+        conf.getRatisNodeFailureTimeout(), TimeUnit.MILLISECONDS));
+  }
+
+  /**
+   * Set properties related to Raft Log.
+   *
+   * @param properties RaftProperties instance which will be updated
+   * @param conf SCMHAConfiguration
+   */
+  private static void setRaftLogProperties(final RaftProperties properties,
+                                           final SCMHAConfiguration conf) {
+    Log.setSegmentSizeMax(properties,
+        SizeInBytes.valueOf(conf.getRaftSegmentSize()));
+    Log.Appender.setBufferElementLimit(properties,
+        conf.getRaftLogAppenderQueueByteLimit());
+    Log.Appender.setBufferByteLimit(properties,
+        SizeInBytes.valueOf(conf.getRaftLogAppenderQueueByteLimit()));
+    Log.setPreallocatedSize(properties,
+        SizeInBytes.valueOf(conf.getRaftSegmentPreAllocatedSize()));
+    Log.Appender.setInstallSnapshotEnabled(properties, false);
+    Log.setPurgeGap(properties, conf.getRaftLogPurgeGap());
+    Log.setSegmentCacheNumMax(properties, 2);
+  }
+
+  /**
+   * Set properties related to Raft Retry Cache.
+   *
+   * @param properties RaftProperties instance which will be updated
+   * @param conf SCMHAConfiguration
+   */
+  private static void setRaftRetryCacheProperties(
+      final RaftProperties properties, final SCMHAConfiguration conf) {
+    RetryCache.setExpiryTime(properties, TimeDuration.valueOf(
+        conf.getRatisRetryCacheTimeout(), TimeUnit.MILLISECONDS));
+  }
+
+  /**
+   * Set properties related to Raft Snapshot.
+   *
+   * @param properties RaftProperties instance which will be updated
+   * @param conf SCMHAConfiguration
+   */
+  private static void setRaftSnapshotProperties(
+      final RaftProperties properties, final SCMHAConfiguration conf) {
+    Snapshot.setAutoTriggerEnabled(properties, true);
+    Snapshot.setAutoTriggerThreshold(properties, 400000);
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
new file mode 100644
index 0000000..7c54723
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/ReflectionUtil.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Reflection util for SCM HA.
+ */
+public final class ReflectionUtil {
+
+  private static Map<String, Class<?>> classCache = new HashMap<>();
+
+  private ReflectionUtil() {
+  }
+
+  /**
+   * Returns the {@code Class} object associated with the given string name.
+   *
+   * @param className the fully qualified name of the desired class.
+   * @return the {@code Class} object for the class with the
+   *         specified name.
+   * @throws ClassNotFoundException if the class cannot be located
+   */
+  public static Class<?> getClass(String className)
+      throws ClassNotFoundException {
+    if (!classCache.containsKey(className)) {
+      classCache.put(className, Class.forName(className));
+    }
+    return classCache.get(className);
+  }
+
+  /**
+   * Returns a {@code Method} object that reflects the specified public
+   * member method of the given {@code Class} object.
+   *
+   * @param clazz the class object which has the method
+   * @param methodName the name of the method
+   * @param arg the list of parameters
+   * @return the {@code Method} object that matches the specified
+   *         {@code name} and {@code parameterTypes}
+   * @throws NoSuchMethodException if a matching method is not found
+   *         or if the name is "&lt;init&gt;"or "&lt;clinit&gt;".
+   */
+  public static Method getMethod(
+      final Class<?> clazz, final String methodName, final Class<?>... arg)
+      throws NoSuchMethodException {
+    return clazz.getMethod(methodName, arg);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
new file mode 100644
index 0000000..1cb8c65
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAConfiguration.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.net.InetSocketAddress;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.HA;
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+
+/**
+ * Configuration used by SCM HA.
+ */
+@ConfigGroup(prefix = "ozone.scm.ha")
+public class SCMHAConfiguration {
+
+  @Config(key = "ratis.storage.dir",
+      type = ConfigType.STRING,
+      defaultValue = "",
+      tags = {OZONE, SCM, HA, RATIS},
+      description = "Storage directory used by SCM to write Ratis logs."
+  )
+  private String ratisStorageDir;
+
+  @Config(key = "ratis.bind.host",
+      type = ConfigType.STRING,
+      defaultValue = "0.0.0.0",
+      tags = {OZONE, SCM, HA, RATIS},
+      description = "Host used by SCM for binding Ratis Server."
+  )
+  private String ratisBindHost = "0.0.0.0";
+
+  @Config(key = "ratis.bind.port",
+      type = ConfigType.STRING,
+      defaultValue = "9865",
+      tags = {OZONE, SCM, HA, RATIS},
+      description = "Port used by SCM for Ratis Server."
+  )
+  private int ratisBindPort = 9865;
+
+
+  @Config(key = "ratis.rpc.type",
+      type = ConfigType.STRING,
+      defaultValue = "GRPC",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "Ratis supports different kinds of transports like" +
+          " netty, GRPC, Hadoop RPC etc. This picks one of those for" +
+          " this cluster."
+  )
+  private String ratisRpcType;
+
+  @Config(key = "ratis.segment.size",
+      type = ConfigType.SIZE,
+      defaultValue = "16KB",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The size of the raft segment used by Apache Ratis on" +
+          " SCM. (16 KB by default)"
+  )
+  private long raftSegmentSize = 16L * 1024L;
+
+  @Config(key = "ratis.segment.preallocated.size",
+      type = ConfigType.SIZE,
+      defaultValue = "16KB",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The size of the buffer which is preallocated for" +
+          " raft segment used by Apache Ratis on SCM.(16 KB by default)"
+  )
+  private long raftSegmentPreAllocatedSize = 16 * 1024;
+
+  @Config(key = "ratis.log.appender.queue.num-elements",
+      type = ConfigType.INT,
+      defaultValue = "1024",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "Number of operation pending with Raft's Log Worker."
+  )
+  private int raftLogAppenderQueueNum = 1024;
+
+  @Config(key = "ratis.log.appender.queue.byte-limit",
+      type = ConfigType.SIZE,
+      defaultValue = "32MB",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "Byte limit for Raft's Log Worker queue."
+  )
+  private int raftLogAppenderQueueByteLimit = 32 * 1024 * 1024;
+
+  @Config(key = "ratis.log.purge.gap",
+      type = ConfigType.INT,
+      defaultValue = "1000000",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The minimum gap between log indices for Raft server to" +
+          " purge its log segments after taking snapshot."
+  )
+  private int raftLogPurgeGap = 1000000;
+
+  @Config(key = "ratis.request.timeout",
+      type = ConfigType.TIME,
+      defaultValue = "3000ms",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The timeout duration for SCM's Ratis server RPC."
+  )
+  private long ratisRequestTimeout = 3000L;
+
+  @Config(key = "ratis.server.retry.cache.timeout",
+      type = ConfigType.TIME,
+      defaultValue = "60s",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "Retry Cache entry timeout for SCM's ratis server."
+  )
+  private long ratisRetryCacheTimeout = 60 * 1000L;
+
+
+  @Config(key = "ratis.leader.election.timeout",
+      type = ConfigType.TIME,
+      defaultValue = "1s",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The minimum timeout duration for SCM ratis leader" +
+          " election. Default is 1s."
+  )
+  private long ratisLeaderElectionTimeout = 1 * 1000L;
+
+  @Config(key = "ratis.server.failure.timeout.duration",
+      type = ConfigType.TIME,
+      defaultValue = "120s",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The timeout duration for ratis server failure" +
+          " detection, once the threshold has reached, the ratis state" +
+          " machine will be informed about the failure in the ratis ring."
+  )
+  private long ratisNodeFailureTimeout = 120 * 1000L;
+
+  @Config(key = "ratis.server.role.check.interval",
+      type = ConfigType.TIME,
+      defaultValue = "15s",
+      tags = {SCM, OZONE, HA, RATIS},
+      description = "The interval between SCM leader performing a role" +
+          " check on its ratis server. Ratis server informs SCM if it loses" +
+          " the leader role. The scheduled check is an secondary check to" +
+          " ensure that the leader role is updated periodically"
+  )
+  private long ratisRoleCheckerInterval = 15 * 1000L;
+
+  public String getRatisStorageDir() {
+    return ratisStorageDir;
+  }
+
+  public InetSocketAddress getRatisBindAddress() {
+    return NetUtils.createSocketAddr(ratisBindHost, ratisBindPort);
+  }
+
+  public String getRatisRpcType() {
+    return ratisRpcType;
+  }
+
+  public long getRaftSegmentSize() {
+    return raftSegmentSize;
+  }
+
+  public long getRaftSegmentPreAllocatedSize() {
+    return raftSegmentPreAllocatedSize;
+  }
+
+  public int getRaftLogAppenderQueueNum() {
+    return raftLogAppenderQueueNum;
+  }
+
+  public int getRaftLogAppenderQueueByteLimit() {
+    return raftLogAppenderQueueByteLimit;
+  }
+
+  public int getRaftLogPurgeGap() {
+    return raftLogPurgeGap;
+  }
+
+  public long getRatisRetryCacheTimeout() {
+    return ratisRetryCacheTimeout;
+  }
+
+  public long getRatisRequestTimeout() {
+    Preconditions.checkArgument(ratisRequestTimeout > 1000L,
+        "Ratis request timeout cannot be less than 1000ms.");
+    return ratisRequestTimeout;
+  }
+
+  public long getRatisRequestMinTimeout() {
+    return ratisRequestTimeout - 1000L;
+  }
+
+  public long getRatisRequestMaxTimeout() {
+    return ratisRequestTimeout + 1000L;
+  }
+
+  public long getRatisLeaderElectionTimeout() {
+    return ratisLeaderElectionTimeout;
+  }
+
+  public long getRatisNodeFailureTimeout() {
+    return ratisNodeFailureTimeout;
+  }
+
+  public long getRatisRoleCheckerInterval() {
+    return ratisRoleCheckerInterval;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
new file mode 100644
index 0000000..c78c616
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAInvocationHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InvocationHandler which checks for {@link Replicate} annotation and
+ * dispatches the request to Ratis Server.
+ */
+public class SCMHAInvocationHandler implements InvocationHandler {
+
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SCMHAInvocationHandler.class);
+
+  private final RequestType requestType;
+  private final Object localHandler;
+  private final SCMRatisServer ratisHandler;
+
+  /**
+   * TODO.
+   */
+  public SCMHAInvocationHandler(final RequestType requestType,
+                                final Object localHandler,
+                                final SCMRatisServer ratisHandler) {
+    this.requestType = requestType;
+    this.localHandler = localHandler;
+    this.ratisHandler = ratisHandler;
+  }
+
+  @Override
+  public Object invoke(final Object proxy, final Method method,
+                       final Object[] args) throws Throwable {
+    try {
+      long startTime = Time.monotonicNow();
+      final Object result = method.isAnnotationPresent(Replicate.class) ?
+          invokeRatis(method, args) : invokeLocal(method, args);
+      LOG.debug("Call: {} took {} ms", method, Time.monotonicNow() - 
startTime);
+      return result;
+    } catch(InvocationTargetException iEx) {
+      throw iEx.getCause();
+    }
+  }
+
+  /**
+   * TODO.
+   */
+  private Object invokeLocal(Method method, Object[] args)
+      throws InvocationTargetException, IllegalAccessException {
+    LOG.trace("Invoking method {} on target {}", method, localHandler);
+    return method.invoke(method, args);
+  }
+
+  /**
+   * TODO.
+   */
+  private Object invokeRatis(Method method, Object[] args)
+      throws Exception {
+    LOG.trace("Invoking method {} on target {}", method, ratisHandler);
+    final SCMRatisResponse response =  ratisHandler.submitRequest(
+        SCMRatisRequest.of(requestType, method.getName(), args));
+    if (response.isSuccess()) {
+      return response.getResult();
+    }
+    // Should we unwrap and throw proper exception from here?
+    throw response.getException();
+  }
+
+}
\ No newline at end of file
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
new file mode 100644
index 0000000..b38fc43
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManager.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+
+import java.io.IOException;
+
+/**
+ * SCMHAManager provides HA service for SCM.
+ *
+ * It uses Apache Ratis for HA implementation. We will have a 2N+1
+ * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
+ * nodes.
+ *
+ * TODO
+ *
+ */
+public class SCMHAManager {
+
+  private static boolean isLeader = true;
+
+  private final SCMRatisServer ratisServer;
+
+  /**
+   * Creates SCMHAManager instance.
+   */
+  public SCMHAManager(final ConfigurationSource conf) throws IOException {
+    this.ratisServer = new SCMRatisServer(
+        conf.getObject(SCMHAConfiguration.class), conf);
+  }
+
+  /**
+   * Starts HA service.
+   */
+  public void start() throws IOException {
+    ratisServer.start();
+  }
+
+  /**
+   * Returns true if the current SCM is the leader.
+   */
+  public static boolean isLeader() {
+    return isLeader;
+  }
+
+  /**
+   * Returns RatisServer instance associated with the SCM instance.
+   */
+  public SCMRatisServer getRatisServer() {
+    return ratisServer;
+  }
+
+  /**
+   * Stops the HA service.
+   */
+  public void shutdown() throws IOException {
+    ratisServer.stop();
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
new file mode 100644
index 0000000..d65c235
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisRequest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.primitives.Ints;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ProtocolMessageEnum;
+
+import org.apache.ratis.protocol.Message;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.Method;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.MethodArgument;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisRequestProto;
+
+
+/**
+ * Represents the request that is sent to RatisServer.
+ */
+public final class SCMRatisRequest {
+
+  private final RequestType type;
+  private final String operation;
+  private final Object[] arguments;
+
+  private SCMRatisRequest(final RequestType type, final String operation,
+                         final Object... arguments) {
+    this.type = type;
+    this.operation = operation;
+    this.arguments = arguments;
+  }
+
+  public static SCMRatisRequest of(final RequestType type,
+                                   final String operation,
+                                   final Object... arguments) {
+    return new SCMRatisRequest(type, operation, arguments);
+  }
+
+  /**
+   * Returns the type of request.
+   */
+  public RequestType getType() {
+    return type;
+  }
+
+  /**
+   * Returns the operation that this request represents.
+   */
+  public String getOperation() {
+    return operation;
+  }
+
+  /**
+   * Returns the arguments encoded in the request.
+   */
+  public Object[] getArguments() {
+    return arguments.clone();
+  }
+
+  /**
+   * Encodes the request into Ratis Message.
+   */
+  public Message encode() throws InvalidProtocolBufferException {
+    final SCMRatisRequestProto.Builder requestProtoBuilder =
+        SCMRatisRequestProto.newBuilder();
+    requestProtoBuilder.setType(type);
+
+    final Method.Builder methodBuilder = Method.newBuilder();
+    methodBuilder.setName(operation);
+
+    final List<MethodArgument> args = new ArrayList<>();
+    for (Object argument : arguments) {
+      final MethodArgument.Builder argBuilder = MethodArgument.newBuilder();
+      argBuilder.setType(argument.getClass().getCanonicalName());
+      if (argument instanceof GeneratedMessage) {
+        argBuilder.setValue(((GeneratedMessage) argument).toByteString());
+      } else if (argument instanceof ProtocolMessageEnum) {
+        argBuilder.setValue(ByteString.copyFrom(Ints.toByteArray(
+            ((ProtocolMessageEnum) argument).getNumber())));
+      } else {
+        throw new InvalidProtocolBufferException(argument.getClass() +
+            " is not a protobuf object!");
+      }
+      args.add(argBuilder.build());
+    }
+    methodBuilder.addAllArgs(args);
+    return Message.valueOf(
+        org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom(
+            requestProtoBuilder.build().toByteArray()));
+  }
+
+  /**
+   * Decodes the request from Ratis Message.
+   */
+  public static SCMRatisRequest decode(Message message)
+      throws InvalidProtocolBufferException {
+    final SCMRatisRequestProto requestProto =
+        SCMRatisRequestProto.parseFrom(message.getContent().toByteArray());
+    final Method method = requestProto.getMethod();
+    List<Object> args = new ArrayList<>();
+    for (MethodArgument argument : method.getArgsList()) {
+      try {
+        final Class<?> clazz = ReflectionUtil.getClass(argument.getType());
+        if (GeneratedMessage.class.isAssignableFrom(clazz)) {
+          args.add(ReflectionUtil.getMethod(clazz, "parseFrom", byte[].class)
+              .invoke(null, (Object) argument.getValue().toByteArray()));
+        } else if (Enum.class.isAssignableFrom(clazz)) {
+          args.add(ReflectionUtil.getMethod(clazz, "valueOf", int.class)
+              .invoke(null, Ints.fromByteArray(
+                  argument.getValue().toByteArray())));
+        } else {
+          throw new InvalidProtocolBufferException(argument.getType() +
+              " is not a protobuf object!");
+        }
+      } catch (ClassNotFoundException | NoSuchMethodException |
+          IllegalAccessException | InvocationTargetException ex) {
+        throw new InvalidProtocolBufferException(argument.getType() +
+            " cannot be decoded!" + ex.getMessage());
+      }
+    }
+    return new SCMRatisRequest(requestProto.getType(),
+        method.getName(), args.toArray());
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
new file mode 100644
index 0000000..c4bedcc
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisResponse.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.lang.reflect.InvocationTargetException;
+import java.math.BigInteger;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ProtocolMessageEnum;
+import 
org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.SCMRatisResponseProto;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+
+/**
+ * Represents the response from RatisServer.
+ */
+public final class SCMRatisResponse {
+
+  private final boolean success;
+  private final Object result;
+  private final Exception exception;
+
+  private SCMRatisResponse(final Object result) {
+    this(true, result, null);
+  }
+
+  private SCMRatisResponse(final Exception exception) {
+    this(false, null, exception);
+  }
+
+  private SCMRatisResponse(final boolean success, final Object result,
+                           final Exception exception) {
+    this.success = success;
+    this.result = result;
+    this.exception = exception;
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public Object getResult() {
+    return result;
+  }
+
+  public Exception getException() {
+    return exception;
+  }
+
+  public static Message encode(final Object result)
+      throws InvalidProtocolBufferException {
+
+    final ByteString value;
+    if (result instanceof GeneratedMessage) {
+      value = ((GeneratedMessage) result).toByteString();
+    } else if (result instanceof ProtocolMessageEnum) {
+      value = ByteString.copyFrom(BigInteger.valueOf(
+          ((ProtocolMessageEnum) result).getNumber()).toByteArray());
+    } else {
+      throw new InvalidProtocolBufferException(result.getClass() +
+          " is not a protobuf object!");
+    }
+
+    final SCMRatisResponseProto response =
+        SCMRatisResponseProto.newBuilder()
+            .setType(result.getClass().getCanonicalName())
+            .setValue(value)
+        .build();
+    return Message.valueOf(
+        org.apache.ratis.thirdparty.com.google.protobuf.ByteString.copyFrom(
+            response.toByteArray()));
+  }
+
+  public static SCMRatisResponse decode(RaftClientReply reply)
+      throws InvalidProtocolBufferException {
+    return reply.isSuccess() ?
+        new SCMRatisResponse(
+            deserializeResult(reply.getMessage().getContent().toByteArray())) :
+        new SCMRatisResponse(reply.getException());
+  }
+
+  private static Object deserializeResult(byte[] response)
+      throws InvalidProtocolBufferException {
+    final SCMRatisResponseProto responseProto =
+        SCMRatisResponseProto.parseFrom(response);
+    try {
+      final Class<?> clazz = ReflectionUtil.getClass(responseProto.getType());
+      if (GeneratedMessage.class.isAssignableFrom(clazz)) {
+        return ReflectionUtil.getMethod(clazz, "parseFrom", byte[].class)
+            .invoke(null, (Object) responseProto.getValue().toByteArray());
+      }
+
+      if (Enum.class.isAssignableFrom(clazz)) {
+        return ReflectionUtil.getMethod(clazz, "valueOf", int.class)
+            .invoke(null, new BigInteger(
+                responseProto.getValue().toByteArray()).intValue());
+      }
+
+      throw new InvalidProtocolBufferException(responseProto.getType() +
+            " is not a protobuf object!");
+
+    } catch (ClassNotFoundException | NoSuchMethodException |
+        IllegalAccessException | InvocationTargetException ex) {
+      throw new InvalidProtocolBufferException(responseProto.getType() +
+          " cannot be decoded!" + ex.getMessage());
+    }
+
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
new file mode 100644
index 0000000..209535d
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+
+/**
+ * TODO.
+ */
+public class SCMRatisServer {
+
+  private final InetSocketAddress address;
+  private final RaftServer server;
+  private final RaftGroupId raftGroupId;
+  private final RaftGroup raftGroup;
+  private final RaftPeerId raftPeerId;
+  private final SCMStateMachine scmStateMachine;
+  private final ClientId clientId = ClientId.randomId();
+  private final AtomicLong callId = new AtomicLong();
+
+
+  // TODO: Refactor and remove ConfigurationSource and use only
+  //  SCMHAConfiguration.
+  SCMRatisServer(final SCMHAConfiguration haConf,
+                 final ConfigurationSource conf)
+      throws IOException {
+    final String scmServiceId = "SCM-HA-Service";
+    final String scmNodeId = "localhost";
+    this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
+    this.address = haConf.getRatisBindAddress();
+    final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address);
+    final List<RaftPeer> raftPeers = new ArrayList<>();
+    raftPeers.add(localRaftPeer);
+    final RaftProperties serverProperties = RatisUtil
+        .newRaftProperties(haConf, conf);
+    this.raftGroupId = RaftGroupId.valueOf(
+        UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8)));
+    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+    this.scmStateMachine = new SCMStateMachine();
+    this.server = RaftServer.newBuilder()
+        .setServerId(raftPeerId)
+        .setGroup(raftGroup)
+        .setProperties(serverProperties)
+        .setStateMachine(scmStateMachine)
+        .build();
+  }
+
+  void start() throws IOException {
+    server.start();
+  }
+
+  public void registerStateMachineHandler(final RequestType handlerType,
+                                          final Object handler) {
+    scmStateMachine.registerHandler(handlerType, handler);
+  }
+
+  SCMRatisResponse submitRequest(SCMRatisRequest request)
+      throws IOException, ExecutionException, InterruptedException {
+    final RaftClientRequest raftClientRequest = new RaftClientRequest(
+        clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
+        RaftClientRequest.writeRequestType(), null);
+    final RaftClientReply raftClientReply =
+        server.submitClientRequestAsync(raftClientRequest).get();
+    return SCMRatisResponse.decode(raftClientReply);
+  }
+
+  private long nextCallId() {
+    return callId.getAndIncrement() & Long.MAX_VALUE;
+  }
+
+  void stop() throws IOException {
+    server.close();
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
new file mode 100644
index 0000000..b10dd54
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+
+/**
+ * TODO.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+
+  private final Map<RequestType, Object> handlers;
+
+  public SCMStateMachine() {
+    this.handlers = new EnumMap<>(RequestType.class);
+  }
+
+  public void registerHandler(RequestType type, Object handler) {
+    handlers.put(type, handler);
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(
+      final TransactionContext trx) {
+    final CompletableFuture<Message> applyTransactionFuture =
+        new CompletableFuture<>();
+    try {
+      final SCMRatisRequest request = SCMRatisRequest.decode(
+          trx.getClientRequest().getMessage());
+      applyTransactionFuture.complete(process(request));
+    } catch (Exception ex) {
+      applyTransactionFuture.completeExceptionally(ex);
+    }
+    return applyTransactionFuture;
+  }
+
+  private Message process(final SCMRatisRequest request)
+      throws Exception {
+    try {
+      final Object handler = handlers.get(request.getType());
+
+      if (handler == null) {
+        throw new IOException("No handler found for request type " +
+            request.getType());
+      }
+
+      final List<Class<?>> argumentTypes = new ArrayList<>();
+      for(Object args : request.getArguments()) {
+        argumentTypes.add(args.getClass());
+      }
+      final Object result = handler.getClass().getMethod(
+          request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
+          .invoke(handler, request.getArguments());
+
+      return SCMRatisResponse.encode(result);
+    } catch (NoSuchMethodException | SecurityException ex) {
+      throw new InvalidProtocolBufferException(ex.getMessage());
+    } catch (InvocationTargetException e) {
+      final Exception targetEx = (Exception) e.getTargetException();
+      throw targetEx != null ? targetEx : e;
+    }
+  }
+
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java
new file mode 100644
index 0000000..aeed57c
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/Replicate.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * TODO: Add javadoc.
+ */
+@Inherited
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Replicate {
+}
diff --git a/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto 
b/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto
new file mode 100644
index 0000000..1107016
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/proto/SCMRatisProtocol.proto
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+option java_outer_classname = "SCMRatisProtocol";
+option java_generate_equals_and_hash = true;
+
+enum RequestType {
+    PIPELINE = 1;
+    CONTAINER = 2;
+}
+
+message Method {
+    required string name = 1;
+    repeated MethodArgument args = 2;
+}
+
+message MethodArgument {
+    required string type = 1;
+    required bytes value = 2;
+}
+
+message SCMRatisRequestProto {
+    required RequestType type = 1;
+    required Method method = 2;
+}
+
+message SCMRatisResponseProto {
+    required string type = 2;
+    required bytes value = 3;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to