This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new 930d3f0552 HDDS-10372. SCM and Datanode communication for 
reconciliation (#6506)
930d3f0552 is described below

commit 930d3f0552b86d6605625cabe5e8961652d14fa2
Author: Ethan Rose <[email protected]>
AuthorDate: Wed May 29 11:05:42 2024 -0700

    HDDS-10372. SCM and Datanode communication for reconciliation (#6506)
---
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |   8 +
 .../hdds/scm/container/ContainerReplicaInfo.java   |  26 +-
 .../hadoop/hdds/scm/exceptions/SCMException.java   |   3 +-
 .../protocol/StorageContainerLocationProtocol.java |   8 +
 .../container/common/helpers/ContainerUtils.java   |  10 +-
 .../ozone/container/common/impl/ContainerData.java |  33 ++-
 .../container/common/impl/ContainerDataYaml.java   |   4 +-
 .../ozone/container/common/interfaces/Handler.java |   9 +
 .../common/statemachine/DatanodeStateMachine.java  |   2 +
 .../ReconcileContainerCommandHandler.java          | 110 ++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |   6 +
 .../container/keyvalue/KeyValueContainer.java      |   3 +-
 .../container/keyvalue/KeyValueContainerCheck.java |   2 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  20 ++
 .../keyvalue/helpers/KeyValueContainerUtil.java    |   2 +-
 .../container/ozoneimpl/ContainerController.java   |  11 +
 .../container/replication/ContainerImporter.java   |   2 +-
 .../commands/ReconcileContainerCommand.java        |  88 ++++++
 .../common/TestKeyValueContainerData.java          |   4 +
 .../TestSchemaOneBackwardsCompatibility.java       |   2 +-
 .../common/impl/TestContainerDataYaml.java         |  34 ++-
 .../common/statemachine/TestStateContext.java      |   4 +
 .../TestReconcileContainerCommandHandler.java      | 203 +++++++++++++
 .../states/endpoint/TestHeartbeatEndpointTask.java |  38 +++
 .../container/keyvalue/TestKeyValueHandler.java    |  37 +++
 .../replication/TestContainerImporter.java         |   4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |  10 +
 .../src/main/proto/ScmAdminProtocol.proto          |  10 +
 .../interface-client/src/main/proto/hdds.proto     |   1 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  14 +-
 .../src/main/proto/ScmServerProtocol.proto         |   1 +
 .../container/AbstractContainerReportHandler.java  |   1 +
 .../hdds/scm/container/ContainerReplica.java       |  12 +
 .../ReconcileContainerEventHandler.java            |  94 ++++++
 .../ReconciliationEligibilityHandler.java          | 141 +++++++++
 .../scm/container/reconciliation/package-info.java |  21 ++
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   7 +
 ...inerLocationProtocolServerSideTranslatorPB.java |  13 +
 .../hdds/scm/server/SCMClientProtocolServer.java   |  44 ++-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |   8 +
 .../hdds/scm/server/StorageContainerManager.java   |   5 +
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   3 +-
 .../scm/container/TestContainerReportHandler.java  | 152 +++++++++-
 .../TestIncrementalContainerReportHandler.java     | 143 +++++++++-
 .../TestReconcileContainerEventHandler.java        | 314 +++++++++++++++++++++
 .../hdds/scm/cli/ContainerOperationClient.java     |   4 +
 .../hdds/scm/cli/container/ContainerCommands.java  |   3 +-
 .../scm/cli/container/ReconcileSubcommand.java     |  50 ++++
 .../src/main/smoketest/admincli/container.robot    |  67 +++--
 49 files changed, 1730 insertions(+), 61 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
index 14fb0a40cd..1bfdfd34c4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/client/ScmClient.java
@@ -462,4 +462,12 @@ public interface ScmClient extends Closeable {
       String scmId) throws IOException;
 
   String getMetrics(String query) throws IOException;
+
+  /**
+   * Trigger a reconcile command to datanodes for a container ID.
+   *
+   * @param containerID The ID of the container to reconcile.
+   * @throws IOException On error
+   */
+  void reconcileContainer(long containerID) throws IOException;
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
index 5a81f6bb47..a239cbfdba 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaInfo.java
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 
+import java.io.IOException;
 import java.util.UUID;
 
 /**
@@ -35,6 +40,8 @@ public final class ContainerReplicaInfo {
   private long keyCount;
   private long bytesUsed;
   private int replicaIndex = -1;
+  @JsonSerialize(using = LongToHexJsonSerializer.class)
+  private long dataChecksum;
 
   public static ContainerReplicaInfo fromProto(
       HddsProtos.SCMContainerReplicaProto proto) {
@@ -48,7 +55,8 @@ public final class ContainerReplicaInfo {
         .setKeyCount(proto.getKeyCount())
         .setBytesUsed(proto.getBytesUsed())
         .setReplicaIndex(
-            proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1);
+            proto.hasReplicaIndex() ? (int)proto.getReplicaIndex() : -1)
+        .setDataChecksum(proto.getDataChecksum());
     return builder.build();
   }
 
@@ -87,6 +95,17 @@ public final class ContainerReplicaInfo {
     return replicaIndex;
   }
 
+  public long getDataChecksum() {
+    return dataChecksum;
+  }
+
+  private static class LongToHexJsonSerializer extends JsonSerializer<Long> {
+    @Override
+    public void serialize(Long value, JsonGenerator gen, SerializerProvider 
provider) throws IOException {
+      gen.writeString(Long.toHexString(value));
+    }
+  }
+
   /**
    * Builder for ContainerReplicaInfo class.
    */
@@ -134,6 +153,11 @@ public final class ContainerReplicaInfo {
       return this;
     }
 
+    public Builder setDataChecksum(long dataChecksum) {
+      subject.dataChecksum = dataChecksum;
+      return this;
+    }
+
     public ContainerReplicaInfo build() {
       return subject;
     }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 1cebd3296e..fad6feca0b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -139,6 +139,7 @@ public class SCMException extends IOException {
     CA_ROTATION_IN_PROGRESS,
     CA_ROTATION_IN_POST_PROGRESS,
     CONTAINER_ALREADY_CLOSED,
-    CONTAINER_ALREADY_CLOSING
+    CONTAINER_ALREADY_CLOSING,
+    UNSUPPORTED_OPERATION
   }
 }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index df8ed02cf7..e53bd73eb0 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -483,4 +483,12 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
       String scmId) throws IOException;
 
   String getMetrics(String query) throws IOException;
+
+  /**
+   * Trigger a reconcile command to datanodes for the current container ID.
+   *
+   * @param containerID The ID of the container to reconcile.
+   * @throws IOException On error
+   */
+  void reconcileContainer(long containerID) throws IOException;
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index b89ecff48c..22a7408642 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -194,21 +194,21 @@ public final class ContainerUtils {
    * Verify that the checksum stored in containerData is equal to the
    * computed checksum.
    */
-  public static void verifyChecksum(ContainerData containerData,
+  public static void verifyContainerFileChecksum(ContainerData containerData,
       ConfigurationSource conf) throws IOException {
     boolean enabled = conf.getBoolean(
             HddsConfigKeys.HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED,
             HddsConfigKeys.
                     HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT);
     if (enabled) {
-      String storedChecksum = containerData.getChecksum();
+      String storedChecksum = containerData.getContainerFileChecksum();
 
       Yaml yaml = ContainerDataYaml.getYamlForContainerType(
           containerData.getContainerType(),
           containerData instanceof KeyValueContainerData &&
               ((KeyValueContainerData)containerData).getReplicaIndex() > 0);
-      containerData.computeAndSetChecksum(yaml);
-      String computedChecksum = containerData.getChecksum();
+      containerData.computeAndSetContainerFileChecksum(yaml);
+      String computedChecksum = containerData.getContainerFileChecksum();
 
       if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) {
         throw new StorageContainerException("Container checksum error for " +
@@ -225,7 +225,7 @@ public final class ContainerUtils {
    * @param containerDataYamlStr ContainerData as a Yaml String
    * @return Checksum of the container data
    */
-  public static String getChecksum(String containerDataYamlStr)
+  public static String getContainerFileChecksum(String containerDataYamlStr)
       throws StorageContainerException {
     MessageDigest sha;
     try {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
index 3c202ba60a..4e3f2a7d53 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java
@@ -99,8 +99,12 @@ public abstract class ContainerData {
 
   private HddsVolume volume;
 
+  // Checksum of just the container file.
   private String checksum;
 
+  // Checksum of the data within the container.
+  private long dataChecksum;
+
   private boolean isEmpty;
 
   private int replicaIndex;
@@ -112,7 +116,7 @@ public abstract class ContainerData {
   private transient Optional<Instant> lastDataScanTime = Optional.empty();
 
   public static final Charset CHARSET_ENCODING = StandardCharsets.UTF_8;
-  private static final String DUMMY_CHECKSUM = new String(new byte[64],
+  private static final String ZERO_CHECKSUM = new String(new byte[64],
       CHARSET_ENCODING);
 
   // Common Fields need to be stored in .container file.
@@ -159,7 +163,8 @@ public abstract class ContainerData {
     this.originPipelineId = originPipelineId;
     this.originNodeId = originNodeId;
     this.isEmpty = false;
-    setChecksumTo0ByteArray();
+    this.checksum = ZERO_CHECKSUM;
+    this.dataChecksum = 0;
   }
 
   protected ContainerData(ContainerData source) {
@@ -571,15 +576,11 @@ public abstract class ContainerData {
     this.blockCount.set(count);
   }
 
-  public void setChecksumTo0ByteArray() {
-    this.checksum = DUMMY_CHECKSUM;
-  }
-
-  public void setChecksum(String checkSum) {
+  public void setContainerFileChecksum(String checkSum) {
     this.checksum = checkSum;
   }
 
-  public String getChecksum() {
+  public String getContainerFileChecksum() {
     return this.checksum;
   }
 
@@ -630,21 +631,29 @@ public abstract class ContainerData {
    *
    * Checksum of ContainerData is calculated by setting the
    * {@link ContainerData#checksum} field to a 64-byte array with all 0's -
-   * {@link ContainerData#DUMMY_CHECKSUM}. After the checksum is calculated,
+   * {@link ContainerData#ZERO_CHECKSUM}. After the checksum is calculated,
    * the checksum field is updated with this value.
    *
    * @param yaml Yaml for ContainerType to get the ContainerData as Yaml String
    * @throws IOException
    */
-  public void computeAndSetChecksum(Yaml yaml) throws IOException {
+  public void computeAndSetContainerFileChecksum(Yaml yaml) throws IOException 
{
     // Set checksum to dummy value - 0 byte array, to calculate the checksum
     // of rest of the data.
-    setChecksumTo0ByteArray();
+    this.checksum = ZERO_CHECKSUM;
 
     // Dump yaml data into a string to compute its checksum
     String containerDataYamlStr = yaml.dump(this);
 
-    this.checksum = ContainerUtils.getChecksum(containerDataYamlStr);
+    this.checksum = 
ContainerUtils.getContainerFileChecksum(containerDataYamlStr);
+  }
+
+  public void setDataChecksum(long dataChecksum) {
+    this.dataChecksum = dataChecksum;
+  }
+
+  public long getDataChecksum() {
+    return dataChecksum;
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
index a4750b5fae..140a462676 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerDataYaml.java
@@ -98,7 +98,7 @@ public final class ContainerDataYaml {
       // Create Yaml for given container type
       Yaml yaml = getYamlForContainerType(containerType, withReplicaIndex);
       // Compute Checksum and update ContainerData
-      containerData.computeAndSetChecksum(yaml);
+      containerData.computeAndSetContainerFileChecksum(yaml);
 
       // Write the ContainerData with checksum to Yaml file.
       out = new FileOutputStream(
@@ -312,7 +312,7 @@ public final class ContainerDataYaml {
         kvData.setChunksPath((String) nodes.get(OzoneConsts.CHUNKS_PATH));
         Map<String, String> meta = (Map) nodes.get(OzoneConsts.METADATA);
         kvData.setMetadata(meta);
-        kvData.setChecksum((String) nodes.get(OzoneConsts.CHECKSUM));
+        kvData.setContainerFileChecksum((String) 
nodes.get(OzoneConsts.CHECKSUM));
         Long timestamp = (Long) nodes.get(OzoneConsts.DATA_SCAN_TIMESTAMP);
         kvData.setDataScanTimestamp(timestamp);
         String state = (String) nodes.get(OzoneConsts.STATE);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 2ffb9d30d1..179274f2c0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.ozone.container.common.interfaces;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -192,6 +194,13 @@ public abstract class Handler {
   public abstract void deleteContainer(Container container, boolean force)
       throws IOException;
 
+  /**
+   * Triggers reconciliation of this container replica's data with its peers.
+   * @param container container to be reconciled.
+   * @param peers The other datanodes with a copy of this container whose data 
should be checked.
+   */
+  public abstract void reconcileContainer(Container<?> container, 
List<DatanodeDetails> peers) throws IOException;
+
   /**
    * Deletes the given files associated with a block of the container.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 9677144054..9292dba5fd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.Crea
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteContainerCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.FinalizeNewLayoutVersionCommandHandler;
+import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconcileContainerCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReconstructECContainersCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
 import 
org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
@@ -258,6 +259,7 @@ public class DatanodeStateMachine implements Closeable {
             supervisor::nodeStateUpdated))
         .addHandler(new FinalizeNewLayoutVersionCommandHandler())
         .addHandler(new RefreshVolumeUsageCommandHandler())
+        .addHandler(new ReconcileContainerCommandHandler(threadNamePrefix))
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
new file mode 100644
index 0000000000..9a4110c7df
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconcileContainerCommandHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handles commands from SCM to reconcile a container replica on this datanode 
with the replicas on its peers.
+ */
+public class ReconcileContainerCommandHandler implements CommandHandler {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReconcileContainerCommandHandler.class);
+
+  private final AtomicLong invocationCount;
+  private final AtomicInteger queuedCount;
+  private final ExecutorService executor;
+  private long totalTime;
+
+  public ReconcileContainerCommandHandler(String threadNamePrefix) {
+    invocationCount = new AtomicLong(0);
+    queuedCount = new AtomicInteger(0);
+    // TODO Allow configurable thread pool size with a default value when the 
implementation is ready.
+    executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+        .setNameFormat(threadNamePrefix + "ReconcileContainerThread-%d")
+        .build());
+    totalTime = 0;
+  }
+
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container, 
StateContext context,
+      SCMConnectionManager connectionManager) {
+    queuedCount.incrementAndGet();
+    CompletableFuture.runAsync(() -> {
+      invocationCount.incrementAndGet();
+      long startTime = Time.monotonicNow();
+      ReconcileContainerCommand reconcileCommand = (ReconcileContainerCommand) 
command;
+      LOG.info("Processing reconcile container command for container {} with 
peers {}",
+          reconcileCommand.getContainerID(), 
reconcileCommand.getPeerDatanodes());
+      try {
+        
container.getController().reconcileContainer(reconcileCommand.getContainerID(),
+            reconcileCommand.getPeerDatanodes());
+      } catch (IOException ex) {
+        LOG.error("Failed to reconcile container {}.", 
reconcileCommand.getContainerID(), ex);
+      } finally {
+        long endTime = Time.monotonicNow();
+        totalTime += endTime - startTime;
+      }
+    }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
+  }
+
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.reconcileContainerCommand;
+  }
+
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+
+  @Override
+  public long getTotalRunTime() {
+    return totalTime;
+  }
+
+  @Override
+  public int getQueuedCount() {
+    return queuedCount.get();
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 44f0eae49e..b6ab4748fe 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -55,6 +55,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -416,6 +417,11 @@ public class HeartbeatEndpointTask
             commandResponseProto.getRefreshVolumeUsageCommandProto());
         processCommonCommand(commandResponseProto, refreshVolumeUsageCommand);
         break;
+      case reconcileContainerCommand:
+        ReconcileContainerCommand reconcileContainerCommand =
+            
ReconcileContainerCommand.getFromProtobuf(commandResponseProto.getReconcileContainerCommandProto());
+        processCommonCommand(commandResponseProto, reconcileContainerCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 8388182667..de43a29d9f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -885,7 +885,8 @@ public class KeyValueContainer implements 
Container<KeyValueContainerData> {
         .setDeleteTransactionId(containerData.getDeleteTransactionId())
         .setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
         .setOriginNodeId(containerData.getOriginNodeId())
-        .setIsEmpty(containerData.isEmpty());
+        .setIsEmpty(containerData.isEmpty())
+        .setDataChecksum(containerData.getDataChecksum());
     return ciBuilder.build();
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
index 70539111fb..f462510bf7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerCheck.java
@@ -177,7 +177,7 @@ public class KeyValueContainerCheck {
         .checkState(onDiskContainerData != null, "Container File not loaded");
 
     try {
-      ContainerUtils.verifyChecksum(onDiskContainerData, checkConfig);
+      ContainerUtils.verifyContainerFileChecksum(onDiskContainerData, 
checkConfig);
     } catch (IOException ex) {
       return 
ScanResult.unhealthy(ScanResult.FailureType.CORRUPT_CONTAINER_FILE,
           containerFile, ex);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index e575a93de2..2843fe3bcf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -56,6 +57,8 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
+import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
@@ -1160,6 +1163,23 @@ public class KeyValueHandler extends Handler {
     deleteInternal(container, force);
   }
 
+  @Override
+  public void reconcileContainer(Container<?> container, List<DatanodeDetails> 
peers) throws IOException {
+    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
+    ContainerData data = container.getContainerData();
+    long id = data.getContainerID();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
+        .putLong(id)
+        .asReadOnlyBuffer();
+    byteBuffer.rewind();
+    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+    checksumImpl.update(byteBuffer);
+    long dataChecksum = checksumImpl.getValue();
+    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
+    data.setDataChecksum(dataChecksum);
+    sendICR(container);
+  }
+
   /**
    * Called by BlockDeletingService to delete all the chunks in a block
    * before proceeding to delete the block info from DB.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index 90ee356ab5..53def25860 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -210,7 +210,7 @@ public final class KeyValueContainerUtil {
     long containerID = kvContainerData.getContainerID();
 
     // Verify Checksum
-    ContainerUtils.verifyChecksum(kvContainerData, config);
+    ContainerUtils.verifyContainerFileChecksum(kvContainerData, config);
 
     if (kvContainerData.getSchemaVersion() == null) {
       // If this container has not specified a schema version, it is in the old
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index feb5805387..feb86f3519 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto
     .ContainerProtos.ContainerType;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -38,6 +39,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.time.Instant;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -188,6 +190,15 @@ public class ContainerController {
     }
   }
 
+  public void reconcileContainer(long containerID, List<DatanodeDetails> 
peers) throws IOException {
+    Container<?> container = containerSet.getContainer(containerID);
+    if (container == null) {
+      LOG.warn("Container {} to reconcile not found on this datanode.", 
containerID);
+    } else {
+      getHandler(container).reconcileContainer(container, peers);
+    }
+  }
+
   /**
    * Given a container, returns its handler instance.
    *
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index f20094079c..9e5b5dbdab 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -122,7 +122,7 @@ public class ContainerImporter {
             packer.unpackContainerDescriptor(input);
         containerData = getKeyValueContainerData(containerDescriptorYaml);
       }
-      ContainerUtils.verifyChecksum(containerData, conf);
+      ContainerUtils.verifyContainerFileChecksum(containerData, conf);
       containerData.setVolume(targetVolume);
 
       try (FileInputStream input = new FileInputStream(tarFilePath.toFile())) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
new file mode 100644
index 0000000000..cdd4522cc6
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconcileContainerCommand.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+
+/**
+ * Asks datanodes to reconcile the specified container with other container 
replicas.
+ */
+public class ReconcileContainerCommand extends 
SCMCommand<ReconcileContainerCommandProto> {
+
+  private final List<DatanodeDetails> peerDatanodes;
+
+  public ReconcileContainerCommand(long containerID, List<DatanodeDetails> 
peerDatanodes) {
+    // Container ID serves as command ID, since only one reconciliation should 
be in progress at a time.
+    super(containerID);
+    this.peerDatanodes = peerDatanodes;
+  }
+
+
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.reconcileContainerCommand;
+  }
+
+  @Override
+  public ReconcileContainerCommandProto getProto() {
+    ReconcileContainerCommandProto.Builder builder = 
ReconcileContainerCommandProto.newBuilder()
+        .setContainerID(getId());
+    for (DatanodeDetails dd : peerDatanodes) {
+      builder.addPeers(dd.getProtoBufMessage());
+    }
+    return builder.build();
+  }
+
+  public List<DatanodeDetails> getPeerDatanodes() {
+    return peerDatanodes;
+  }
+
+  public long getContainerID() {
+    return getId();
+  }
+
+  public static ReconcileContainerCommand 
getFromProtobuf(ReconcileContainerCommandProto protoMessage) {
+    Preconditions.checkNotNull(protoMessage);
+
+    List<HddsProtos.DatanodeDetailsProto> peers = protoMessage.getPeersList();
+    List<DatanodeDetails> peerNodes = !peers.isEmpty()
+        ? peers.stream()
+        .map(DatanodeDetails::getFromProtoBuf)
+        .collect(Collectors.toList())
+        : emptyList();
+
+    return new ReconcileContainerCommand(protoMessage.getContainerID(), 
peerNodes);
+  }
+
+  @Override
+  public String toString() {
+    return getType() +
+        ": containerId=" + getContainerID() +
+        ", peerNodes=" + peerDatanodes;
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
index b3b0f5b437..4360243f0b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestKeyValueContainerData.java
@@ -84,6 +84,7 @@ public class TestKeyValueContainerData {
     assertEquals(val.get(), kvData.getBlockCount());
     assertEquals(val.get(), kvData.getNumPendingDeletionBlocks());
     assertEquals(MAXSIZE, kvData.getMaxSize());
+    assertEquals(0, kvData.getDataChecksum());
 
     kvData.setState(state);
     kvData.setContainerDBType(containerDBType);
@@ -98,6 +99,8 @@ public class TestKeyValueContainerData {
     kvData.incrPendingDeletionBlocks(1);
     kvData.setSchemaVersion(
         VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf));
+    long expectedDataHash =  1234L;
+    kvData.setDataChecksum(expectedDataHash);
 
     assertEquals(state, kvData.getState());
     assertEquals(containerDBType, kvData.getContainerDBType());
@@ -114,6 +117,7 @@ public class TestKeyValueContainerData {
     assertEquals(datanodeId.toString(), kvData.getOriginNodeId());
     assertEquals(VersionedDatanodeFeatures.SchemaV3.chooseSchemaVersion(conf),
         kvData.getSchemaVersion());
+    assertEquals(expectedDataHash, kvData.getDataChecksum());
 
     KeyValueContainerData newKvData = new KeyValueContainerData(kvData);
     assertEquals(kvData.getReplicaIndex(), newKvData.getReplicaIndex());
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
index 2235b23ce8..ad5ca48218 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestSchemaOneBackwardsCompatibility.java
@@ -614,7 +614,7 @@ public class TestSchemaOneBackwardsCompatibility {
     Yaml yaml = ContainerDataYaml.getYamlForContainerType(
             kvData.getContainerType(),
         kvData.getReplicaIndex() > 0);
-    kvData.computeAndSetChecksum(yaml);
+    kvData.computeAndSetContainerFileChecksum(yaml);
 
     KeyValueContainerUtil.parseKVContainerData(kvData, conf);
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
index ec78398824..2057c4400a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
@@ -87,6 +87,7 @@ public class TestContainerDataYaml {
     keyValueContainerData.setSchemaVersion(
         VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion());
     keyValueContainerData.setReplicaIndex(replicaIndex);
+    keyValueContainerData.setDataChecksum(12345);
 
     File containerFile = new File(testRoot, containerPath);
 
@@ -218,7 +219,7 @@ public class TestContainerDataYaml {
     File file = new File(classLoader.getResource(containerFile).getFile());
     KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
         .readContainerFile(file);
-    ContainerUtils.verifyChecksum(kvData, conf);
+    ContainerUtils.verifyContainerFileChecksum(kvData, conf);
 
     //Checking the Container file data is consistent or not
     assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED, kvData
@@ -236,7 +237,7 @@ public class TestContainerDataYaml {
   }
 
   /**
-   * Test to verify {@link 
ContainerUtils#verifyChecksum(ContainerData,ConfigurationSource)}.
+   * Test to verify {@link 
ContainerUtils#verifyContainerFileChecksum(ContainerData,ConfigurationSource)}.
    */
   @ContainerLayoutTestInfo.ContainerTest
   public void testChecksumInContainerFile(ContainerLayoutVersion layout) 
throws IOException {
@@ -247,13 +248,32 @@ public class TestContainerDataYaml {
 
     // Read from .container file, and verify data.
     KeyValueContainerData kvData = (KeyValueContainerData) 
ContainerDataYaml.readContainerFile(containerFile);
-    ContainerUtils.verifyChecksum(kvData, conf);
+    ContainerUtils.verifyContainerFileChecksum(kvData, conf);
 
     cleanup();
   }
 
   /**
-   * Test to verify {@link 
ContainerUtils#verifyChecksum(ContainerData,ConfigurationSource)}.
+   * The container's data checksum is stored in a separate file with its 
Merkle hash tree. It should not be persisted
+   * to the .container file.
+   */
+  @ContainerLayoutTestInfo.ContainerTest
+  public void testDataChecksumNotInContainerFile(ContainerLayoutVersion 
layout) throws IOException {
+    setLayoutVersion(layout);
+    long containerID = testContainerID++;
+
+    File containerFile = createContainerFile(containerID, 0);
+
+    // Read from .container file. The kvData object should not have a data 
hash because it was not persisted in this
+    // file.
+    KeyValueContainerData kvData = (KeyValueContainerData) 
ContainerDataYaml.readContainerFile(containerFile);
+    assertEquals(0, kvData.getDataChecksum());
+
+    cleanup();
+  }
+
+  /**
+   * Test to verify {@link 
ContainerUtils#verifyContainerFileChecksum(ContainerData,ConfigurationSource)}.
    */
   @ContainerLayoutTestInfo.ContainerTest
   public void testChecksumInContainerFileWithReplicaIndex(
@@ -266,7 +286,7 @@ public class TestContainerDataYaml {
     // Read from .container file, and verify data.
     KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
         .readContainerFile(containerFile);
-    ContainerUtils.verifyChecksum(kvData, conf);
+    ContainerUtils.verifyContainerFileChecksum(kvData, conf);
 
     cleanup();
   }
@@ -287,7 +307,7 @@ public class TestContainerDataYaml {
     setLayoutVersion(layout);
     Exception ex = assertThrows(Exception.class, () -> {
       KeyValueContainerData kvData = getKeyValueContainerData();
-      ContainerUtils.verifyChecksum(kvData, conf);
+      ContainerUtils.verifyContainerFileChecksum(kvData, conf);
     });
 
     assertThat(ex).hasMessageStartingWith("Container checksum error for 
ContainerID:");
@@ -303,6 +323,6 @@ public class TestContainerDataYaml {
     KeyValueContainerData kvData = getKeyValueContainerData();
     conf.setBoolean(HddsConfigKeys.
         HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED, false);
-    ContainerUtils.verifyChecksum(kvData, conf);
+    ContainerUtils.verifyContainerFileChecksum(kvData, conf);
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 7f2cdcc6e5..d337b3a5f2 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ozone.test.LambdaTestUtils;
@@ -707,6 +708,7 @@ public class TestStateContext {
     ctx.addCommand(ReplicateContainerCommand.forTest(3));
     ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
     ctx.addCommand(new CloseContainerCommand(1, PipelineID.randomId()));
+    ctx.addCommand(new ReconcileContainerCommand(4, Collections.emptyList()));
 
     Map<SCMCommandProto.Type, Integer> summary = ctx.getCommandQueueSummary();
     assertEquals(3,
@@ -715,6 +717,8 @@ public class TestStateContext {
         summary.get(SCMCommandProto.Type.closePipelineCommand).intValue());
     assertEquals(1,
         summary.get(SCMCommandProto.Type.closeContainerCommand).intValue());
+    assertEquals(1,
+        
summary.get(SCMCommandProto.Type.reconcileContainerCommand).intValue());
   }
 
   @Test
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
new file mode 100644
index 0000000000..d6be667f41
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestReconcileContainerCommandHandler.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.ozone.test.GenericTestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests Datanode handling of reconcile container commands.
+ */
+public class TestReconcileContainerCommandHandler {
+  public static final Logger LOG = 
LoggerFactory.getLogger(TestReconcileContainerCommandHandler.class);
+
+  private static final int NUM_CONTAINERS = 3;
+
+  private ContainerSet containerSet;
+  private OzoneContainer ozoneContainer;
+  private StateContext context;
+  private ReconcileContainerCommandHandler subject;
+
+  public void init(ContainerLayoutVersion layout, 
IncrementalReportSender<Container> icrSender)
+      throws Exception {
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeDetails dnDetails = randomDatanodeDetails();
+    subject = new ReconcileContainerCommandHandler("");
+    context = ContainerTestUtils.getMockContext(dnDetails, conf);
+
+    containerSet = new ContainerSet(1000);
+    for (int id = 1; id <= NUM_CONTAINERS; id++) {
+      KeyValueContainerData data = new KeyValueContainerData(id, layout, GB,
+          PipelineID.randomId().toString(), 
randomDatanodeDetails().getUuidString());
+      containerSet.addContainer(new KeyValueContainer(data, conf));
+    }
+
+    assertEquals(NUM_CONTAINERS, containerSet.containerCount());
+
+    Handler containerHandler = new KeyValueHandler(new OzoneConfiguration(), 
dnDetails.getUuidString(), containerSet,
+        mock(VolumeSet.class), mock(ContainerMetrics.class), icrSender);
+    ContainerController controller = new ContainerController(containerSet,
+        singletonMap(ContainerProtos.ContainerType.KeyValueContainer, 
containerHandler));
+    ozoneContainer = mock(OzoneContainer.class);
+    when(ozoneContainer.getController()).thenReturn(controller);
+    when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+  }
+
+  @ContainerLayoutTestInfo.ContainerTest
+  public void testReconcileContainerCommandReports(ContainerLayoutVersion 
layout) throws Exception {
+    Map<ContainerID, ContainerReplicaProto> containerReportsSent = new 
HashMap<>();
+    IncrementalReportSender<Container> icrSender = c -> {
+      try {
+        ContainerID id = 
ContainerID.valueOf(c.getContainerData().getContainerID());
+        containerReportsSent.put(id, c.getContainerReport());
+        LOG.info("Added container report for container {}", id);
+      } catch (Exception ex) {
+        LOG.error("ICR sender failed", ex);
+      }
+    };
+    init(layout, icrSender);
+
+    for (int id = 1; id <= NUM_CONTAINERS; id++) {
+      ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, 
Collections.emptyList());
+      subject.handle(cmd, ozoneContainer, context, null);
+    }
+
+    // An unknown container should not trigger a container report being sent.
+    ReconcileContainerCommand unknownContainerCmd = new 
ReconcileContainerCommand(NUM_CONTAINERS + 1,
+        Collections.emptyList());
+    subject.handle(unknownContainerCmd, ozoneContainer, context, null);
+
+    waitForAllCommandsToFinish();
+    verifyAllContainerReports(containerReportsSent);
+  }
+
+  @ContainerLayoutTestInfo.ContainerTest
+  public void testReconcileContainerCommandMetrics(ContainerLayoutVersion 
layout) throws Exception {
+    // Used to block ICR sending so that queue metrics can be checked before 
the reconcile task completes.
+    CountDownLatch icrLatch = new CountDownLatch(1);
+    // Wait this long before completing the task.
+    // This provides a lower bound on execution time.
+    final long minExecTimeMillis = 500;
+    // This is the lower bound on execution time of all the commands combined.
+    final long expectedTotalMinExecTimeMillis = minExecTimeMillis * 
NUM_CONTAINERS;
+
+    IncrementalReportSender<Container> icrSender = c -> {
+      try {
+        // Block the caller until the latch is counted down.
+        // Caller can check queue metrics in the meantime.
+        LOG.info("ICR sender waiting for latch");
+        assertTrue(icrLatch.await(30, TimeUnit.SECONDS));
+        LOG.info("ICR sender proceeding after latch");
+
+        Thread.sleep(minExecTimeMillis);
+      } catch (Exception ex) {
+        LOG.error("ICR sender failed", ex);
+      }
+    };
+
+    init(layout, icrSender);
+
+    // All commands submitted will be blocked until the latch is counted down.
+    for (int id = 1; id <= NUM_CONTAINERS; id++) {
+      ReconcileContainerCommand cmd = new ReconcileContainerCommand(id, 
Collections.emptyList());
+      subject.handle(cmd, ozoneContainer, context, null);
+    }
+    assertEquals(NUM_CONTAINERS, subject.getQueuedCount());
+    assertEquals(0, subject.getTotalRunTime());
+    assertEquals(0, subject.getAverageRunTime());
+
+    // This will resume handling of the tasks.
+    icrLatch.countDown();
+    waitForAllCommandsToFinish();
+
+    assertEquals(NUM_CONTAINERS, subject.getInvocationCount());
+    long totalRunTime = subject.getTotalRunTime();
+    assertTrue(totalRunTime >= expectedTotalMinExecTimeMillis,
+        "Total run time " + totalRunTime + "ms was not larger than the minimum 
total exec time " +
+            expectedTotalMinExecTimeMillis + "ms");
+    long avgRunTime = subject.getAverageRunTime();
+    assertTrue(avgRunTime >= minExecTimeMillis,
+        "Average run time " + avgRunTime + "ms was not larger than the minimum 
per task exec time " +
+            minExecTimeMillis + "ms");
+  }
+
+  private void waitForAllCommandsToFinish() throws Exception {
+    // Queue count should be decremented only after the task completes, so the 
other metrics should be consistent when
+    // it reaches zero.
+    GenericTestUtils.waitFor(() -> {
+      int qCount = subject.getQueuedCount();
+      LOG.info("Waiting for queued command count to reach 0. Currently at " + 
qCount);
+      return qCount == 0;
+    }, 500, 3000);
+  }
+
+  private void verifyAllContainerReports(Map<ContainerID, 
ContainerReplicaProto> reportsSent) throws Exception {
+    assertEquals(NUM_CONTAINERS, reportsSent.size());
+
+    for (Map.Entry<ContainerID, ContainerReplicaProto> entry: 
reportsSent.entrySet()) {
+      ContainerID id = entry.getKey();
+      assertNotNull(containerSet.getContainer(id.getId()));
+
+      long sentDataChecksum = entry.getValue().getDataChecksum();
+      // Current implementation is incomplete, and uses a mocked checksum.
+      assertNotEquals(0, sentDataChecksum, "Report of container " + id +
+          " should have a non-zero checksum");
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 7d8b94e57d..6245489f13 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.states.endpoint;
 
 import static java.util.Collections.emptyList;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
 import static 
org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,6 +58,7 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 
@@ -111,6 +113,42 @@ public class TestHeartbeatEndpointTask {
         .get(reconstructECContainersCommand).intValue());
   }
 
+  @Test
+  public void testHandlesReconcileContainerCommand() throws Exception {
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        mock(StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+
+    List<DatanodeDetails> peerDNs = new ArrayList<>();
+    peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
+    peerDNs.add(MockDatanodeDetails.randomDatanodeDetails());
+    ReconcileContainerCommand cmd = new ReconcileContainerCommand(1, peerDNs);
+
+    when(scm.sendHeartbeat(any()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .addCommands(SCMCommandProto.newBuilder()
+                    .setCommandType(reconcileContainerCommand)
+                    .setReconcileContainerCommandProto(cmd.getProto())
+                    .build())
+                .build());
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachine = 
mock(DatanodeStateMachine.class);
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        datanodeStateMachine, "");
+
+    // WHEN
+    HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
+    task.call();
+
+    // THEN
+    assertEquals(1, context.getCommandQueueSummary()
+        .get(reconcileContainerCommand).intValue());
+  }
+
   @Test
   public void testheartbeatWithoutReports() throws Exception {
     final long termInSCM = 42;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index b9c8feae16..2ce6eabe39 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -37,7 +37,9 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
@@ -56,7 +58,9 @@ import org.apache.ozone.test.GenericTestUtils;
 
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.GB;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -64,6 +68,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.any;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -433,6 +438,38 @@ public class TestKeyValueHandler {
     }
   }
 
+  @ContainerLayoutTestInfo.ContainerTest
+  public void testReconcileContainer(ContainerLayoutVersion layoutVersion) 
throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+
+    KeyValueContainerData data = new KeyValueContainerData(123L, 
layoutVersion, GB,
+        PipelineID.randomId().toString(), 
randomDatanodeDetails().getUuidString());
+
+    Container container = new KeyValueContainer(data, conf);
+    ContainerSet containerSet = new ContainerSet(1000);
+    containerSet.addContainer(container);
+
+    // Allows checking the invocation count of the lambda.
+    AtomicInteger icrCount = new AtomicInteger(0);
+    KeyValueHandler keyValueHandler = new KeyValueHandler(conf, 
randomDatanodeDetails().getUuidString(), containerSet,
+        mock(MutableVolumeSet.class), mock(ContainerMetrics.class), c -> {
+      // Check that the ICR contains expected info about the container.
+      ContainerReplicaProto report = c.getContainerReport();
+      long reportedID = report.getContainerID();
+      Assertions.assertEquals(container.getContainerData().getContainerID(), 
reportedID);
+
+      long reportDataChecksum = report.getDataChecksum();
+      Assertions.assertNotEquals(0, reportDataChecksum,
+          "Container report should have populated the checksum field with a 
non-zero value.");
+      icrCount.incrementAndGet();
+    });
+
+    Assertions.assertEquals(0, icrCount.get());
+    // This should trigger container report validation in the ICR handler 
above.
+    keyValueHandler.reconcileContainer(container, Collections.emptyList());
+    Assertions.assertEquals(1, icrCount.get());
+  }
+
   private static ContainerCommandRequestProto createContainerRequest(
       String datanodeId, long containerID) {
     return ContainerCommandRequestProto.newBuilder()
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
index 1b989e6bc7..6680a467b1 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerImporter.java
@@ -56,7 +56,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -152,8 +151,7 @@ class TestContainerImporter {
     KeyValueContainerData containerData = spy(new 
KeyValueContainerData(containerId,
         ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test"));
     // mock to return different checksum
-    when(containerData.getChecksum()).thenReturn("checksum1", "checksum2");
-    doNothing().when(containerData).setChecksumTo0ByteArray();
+    when(containerData.getContainerFileChecksum()).thenReturn("checksum1", 
"checksum2");
     // create containerImporter object
     ContainerController controllerMock = mock(ContainerController.class);
     ContainerSet containerSet = new ContainerSet(0);
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 3570257b58..745790abc8 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -103,6 +103,7 @@ import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartContainerBalancerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopContainerBalancerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ResetDeletedBlockRetryCountRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
 import org.apache.hadoop.hdds.scm.DatanodeAdminError;
 import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -1198,4 +1199,13 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
     String metricsJsonStr = response.getMetricsJson();
     return metricsJsonStr;
   }
+
+  @Override
+  public void reconcileContainer(long containerID) throws IOException {
+    ReconcileContainerRequestProto request = 
ReconcileContainerRequestProto.newBuilder()
+        .setContainerID(containerID)
+        .build();
+    // TODO check error handling.
+    submitRequest(Type.ReconcileContainer, builder -> 
builder.setReconcileContainerRequest(request));
+  }
 }
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index c190dc3f45..ea63b82c8c 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -84,6 +84,7 @@ message ScmContainerLocationRequest {
   optional SingleNodeQueryRequestProto singleNodeQueryRequest = 45;
   optional GetContainersOnDecomNodeRequestProto 
getContainersOnDecomNodeRequest = 46;
   optional GetMetricsRequestProto getMetricsRequest = 47;
+  optional ReconcileContainerRequestProto reconcileContainerRequest = 48;
 }
 
 message ScmContainerLocationResponse {
@@ -139,6 +140,7 @@ message ScmContainerLocationResponse {
   optional SingleNodeQueryResponseProto singleNodeQueryResponse = 45;
   optional GetContainersOnDecomNodeResponseProto 
getContainersOnDecomNodeResponse = 46;
   optional GetMetricsResponseProto getMetricsResponse = 47;
+  optional ReconcileContainerResponseProto reconcileContainerResponse = 48;
 
   enum Status {
     OK = 1;
@@ -193,6 +195,7 @@ enum Type {
   SingleNodeQuery = 41;
   GetContainersOnDecomNode = 42;
   GetMetrics = 43;
+  ReconcileContainer = 44;
 }
 
 /**
@@ -637,6 +640,13 @@ message GetMetricsResponseProto {
   optional string metricsJson = 1;
 }
 
+message ReconcileContainerRequestProto {
+  required int64 containerID = 1;
+}
+
+message ReconcileContainerResponseProto {
+}
+
 /**
  * Protocol used from an HDFS node to StorageContainerManager.  See the request
  * and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 4058453123..93cdbf0462 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -432,6 +432,7 @@ message SCMContainerReplicaProto {
     required int64 keyCount = 6;
     required int64 bytesUsed = 7;
     optional int64 replicaIndex = 8;
+    optional int64 dataChecksum = 9;
 }
 
 message KeyContainerIDList {
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 2994073c02..d5600880e6 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -230,12 +230,13 @@ message ContainerReplicaProto {
   optional int64 writeCount = 7;
   optional int64 readBytes = 8;
   optional int64 writeBytes = 9;
-  optional string finalhash = 10;
+  optional string finalhash = 10 [ deprecated = true ];
   optional int64 deleteTransactionId = 11;
   optional uint64 blockCommitSequenceId = 12;
   optional string originNodeId = 13;
   optional int32 replicaIndex = 14;
   optional bool isEmpty = 15 [default = false];
+  optional int64 dataChecksum = 16;
 }
 
 message CommandStatusReportsProto {
@@ -328,6 +329,7 @@ message SCMCommandProto {
     finalizeNewLayoutVersionCommand = 9;
     refreshVolumeUsageInfo = 10;
     reconstructECContainersCommand = 11;
+    reconcileContainerCommand = 12;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -343,6 +345,7 @@ message SCMCommandProto {
   finalizeNewLayoutVersionCommandProto = 10;
   optional RefreshVolumeUsageCommandProto refreshVolumeUsageCommandProto = 11;
   optional ReconstructECContainersCommandProto 
reconstructECContainersCommandProto = 12;
+  optional ReconcileContainerCommandProto reconcileContainerCommandProto = 13;
 
 
   // If running upon Ratis, holds term of underlying RaftServer iff current
@@ -499,6 +502,15 @@ message FinalizeNewLayoutVersionCommandProto {
   required int64 cmdId = 3;
 }
 
+/**
+This command asks the datanode to reconcile its copy of a container with its 
peer datanodes that also have a copy of
+the container.
+*/
+message ReconcileContainerCommandProto {
+  required int64 containerID = 1;
+  repeated DatanodeDetailsProto peers = 2;
+}
+
 /**
  * Protocol used from a datanode to StorageContainerManager.
  *
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 3d281975f2..9739f06b4b 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -139,6 +139,7 @@ enum Status {
   CA_ROTATION_IN_POST_PROGRESS = 44;
   CONTAINER_ALREADY_CLOSED = 45;
   CONTAINER_ALREADY_CLOSING = 46;
+  UNSUPPORTED_OPERATION = 47;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index 7e163ac306..db00d6842d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -381,6 +381,7 @@ public class AbstractContainerReportHandler {
         .setReplicaIndex(replicaProto.getReplicaIndex())
         .setBytesUsed(replicaProto.getUsed())
         .setEmpty(replicaProto.getIsEmpty())
+        .setDataChecksum(replicaProto.getDataChecksum())
         .build();
 
     if (replica.getState().equals(State.DELETED)) {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 78ebfd311d..d008e24f1c 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@ -43,6 +43,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
   private final long keyCount;
   private final long bytesUsed;
   private final boolean isEmpty;
+  private final long dataChecksum;
 
   private ContainerReplica(ContainerReplicaBuilder b) {
     containerID = b.containerID;
@@ -54,6 +55,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     replicaIndex = b.replicaIndex;
     isEmpty = b.isEmpty;
     sequenceId = b.sequenceId;
+    dataChecksum = b.dataChecksum;
   }
 
   /**
@@ -114,6 +116,10 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     return isEmpty;
   }
 
+  public long getDataChecksum() {
+    return dataChecksum;
+  }
+
   @Override
   public int hashCode() {
     return new HashCodeBuilder(61, 71)
@@ -201,6 +207,7 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
     private long keyCount;
     private int replicaIndex;
     private boolean isEmpty;
+    private long dataChecksum;
 
     /**
      * Set Container Id.
@@ -275,6 +282,11 @@ public final class ContainerReplica implements 
Comparable<ContainerReplica> {
       return this;
     }
 
+    public ContainerReplicaBuilder setDataChecksum(long dataChecksum) {
+      this.dataChecksum = dataChecksum;
+      return this;
+    }
+
     /**
      * Constructs new ContainerReplicaBuilder.
      *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
new file mode 100644
index 0000000000..f13b37f3ee
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconcileContainerEventHandler.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import 
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+
+/**
+ * When a reconcile container event is fired, this class will check if the 
container is eligible for reconciliation,
+ * and if so, send the reconcile request to all datanodes with a replica of 
that container.
+ */
+public class ReconcileContainerEventHandler implements 
EventHandler<ContainerID> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ReconcileContainerEventHandler.class);
+
+  private final ContainerManager containerManager;
+  private final SCMContext scmContext;
+
+  public ReconcileContainerEventHandler(ContainerManager containerManager, 
SCMContext scmContext) {
+    this.containerManager = containerManager;
+    this.scmContext = scmContext;
+  }
+
+  @Override
+  public void onMessage(ContainerID containerID, EventPublisher publisher) {
+    if (!scmContext.isLeader()) {
+      LOG.info("Skip reconciling container {} since current SCM is not 
leader.", containerID);
+      return;
+    }
+
+    EligibilityResult result = 
ReconciliationEligibilityHandler.isEligibleForReconciliation(containerID,
+        containerManager);
+    if (!result.isOk()) {
+      LOG.error("{}", result);
+      return;
+    }
+
+    try {
+      // TODO HDDS-10714 restriction peer and target nodes based on node 
status.
+      Set<DatanodeDetails> allReplicaNodes = 
containerManager.getContainerReplicas(containerID)
+          .stream()
+          .map(ContainerReplica::getDatanodeDetails)
+          .collect(Collectors.toSet());
+
+      LOG.info("Reconcile container event triggered for container {} with 
peers {}", containerID, allReplicaNodes);
+
+      for (DatanodeDetails replica : allReplicaNodes) {
+        List<DatanodeDetails> otherReplicas = allReplicaNodes.stream()
+            .filter(other -> !other.equals(replica))
+            .collect(Collectors.toList());
+        ReconcileContainerCommand command = new 
ReconcileContainerCommand(containerID.getId(), otherReplicas);
+        command.setTerm(scmContext.getTermOfLeader());
+        publisher.fireEvent(DATANODE_COMMAND, new 
CommandForDatanode<>(replica.getUuid(), command));
+      }
+    } catch (ContainerNotFoundException ex) {
+      LOG.error("Failed to start reconciliation for container {}. Container 
not found.", containerID);
+    } catch (NotLeaderException nle) {
+      LOG.info("Skip reconciling container {} since current SCM is not 
leader.", containerID);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java
new file mode 100644
index 0000000000..cdc08556d2
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/ReconciliationEligibilityHandler.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Determines whether a container is eligible for reconciliation based on its 
state, replica states, replication
+ * type, and replication factor.
+ */
+public final class ReconciliationEligibilityHandler {
+  public static final Set<HddsProtos.LifeCycleState> ELIGIBLE_CONTAINER_STATES 
=
+      EnumSet.of(HddsProtos.LifeCycleState.CLOSED, 
HddsProtos.LifeCycleState.QUASI_CLOSED);
+  public static final Set<State> ELIGIBLE_REPLICA_STATES =
+      EnumSet.of(State.CLOSED, State.QUASI_CLOSED, State.UNHEALTHY);
+
+  /**
+   * Utility class only.
+   */
+  private ReconciliationEligibilityHandler() { }
+
+  public static EligibilityResult isEligibleForReconciliation(
+      ContainerID containerID, ContainerManager containerManager) {
+    ContainerInfo container;
+    Set<ContainerReplica> replicas;
+    try {
+      container = containerManager.getContainer(containerID);
+      replicas = containerManager.getContainerReplicas(containerID);
+    } catch (ContainerNotFoundException ex) {
+      return new EligibilityResult(Result.CONTAINER_NOT_FOUND,
+          String.format("Container %s not found for reconciliation.", 
containerID));
+    }
+
+    if (!ELIGIBLE_CONTAINER_STATES.contains(container.getState())) {
+      return new EligibilityResult(Result.INELIGIBLE_CONTAINER_STATE,
+          String.format("Cannot reconcile container %d in state %s.", 
container.getContainerID(),
+              container.getState()));
+    }
+
+    if (replicas.isEmpty()) {
+      return new EligibilityResult(Result.NO_REPLICAS_FOUND,
+          String.format("Cannot reconcile container %d because no replicas 
could be found.",
+              container.getContainerID()));
+    }
+
+    boolean replicasValid = replicas.stream()
+        .map(ContainerReplica::getState)
+        .allMatch(ELIGIBLE_REPLICA_STATES::contains);
+    if (!replicasValid) {
+      return new EligibilityResult(Result.INELIGIBLE_REPLICA_STATES,
+          String.format("Cannot reconcile container %s in state %s with 
replica states: %s", containerID,
+              container.getState(), replicas.stream()
+                  .map(r -> r.getState().toString())
+                  .collect(Collectors.joining(", "))));
+    }
+
+    // Reconcile on EC containers is not yet implemented.
+    ReplicationConfig repConfig = container.getReplicationConfig();
+    if (repConfig.getReplicationType() != HddsProtos.ReplicationType.RATIS) {
+      return new EligibilityResult(Result.INELIGIBLE_REPLICATION_TYPE,
+          String.format("Cannot reconcile container %s with replication type 
%s. Reconciliation is currently only " +
+              "supported for Ratis containers.", containerID, 
repConfig.getReplicationType()));
+    }
+
+    // Reconciliation requires multiple replicas to reconcile.
+    int requiredNodes = repConfig.getRequiredNodes();
+    if (requiredNodes <= 1) {
+      return new EligibilityResult(Result.NOT_ENOUGH_REQUIRED_NODES,
+          String.format("Cannot reconcile container %s with %d required nodes. 
Reconciliation is only supported for " +
+              "containers with more than 1 required node.", containerID, 
repConfig.getRequiredNodes()));
+    }
+
+    return new EligibilityResult(Result.OK, "Container %s is eligible for 
reconciliation." + containerID);
+  }
+
+  /**
+   * Defines the reasons a container may not be eligible for reconciliation.
+   */
+  public enum Result {
+    OK,
+    CONTAINER_NOT_FOUND,
+    INELIGIBLE_CONTAINER_STATE,
+    INELIGIBLE_REPLICA_STATES,
+    INELIGIBLE_REPLICATION_TYPE,
+    NOT_ENOUGH_REQUIRED_NODES,
+    NO_REPLICAS_FOUND
+  }
+
+  /**
+   * Provides a status and message indicating whether a container is eligible 
for reconciliation.
+   */
+  public static final class EligibilityResult {
+    private final Result result;
+    private final String message;
+
+    private EligibilityResult(Result result, String message) {
+      this.result = result;
+      this.message = message;
+    }
+
+    public Result getResult() {
+      return result;
+    }
+
+    public boolean isOk() {
+      return result == Result.OK;
+    }
+
+    @Override
+    public String toString() {
+      return message;
+    }
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java
new file mode 100644
index 0000000000..fa1e355fd1
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/reconciliation/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+/**
+ * This package contains classes related to container reconciliation.
+ */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 0cc205b2ff..3b2a84f4f3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -217,6 +217,13 @@ public final class SCMEvents {
       new TypedEvent<>(CRLStatusReportFromDatanode.class,
           "Crl_Status_Report");
 
+  /**
+   * This event will be triggered whenever a datanode needs to reconcile its 
replica of a container with other
+   * replicas in the cluster.
+   */
+  public static final TypedEvent<ContainerID>
+      RECONCILE_CONTAINER = new TypedEvent<>(ContainerID.class, 
"Reconcile_Container");
+
   /**
    * Private Ctor. Never Constructed.
    */
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 3d7cff358f..b88f2c2a7d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -30,6 +30,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipReques
 import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.TransferLeadershipResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.UpgradeFinalizationStatus;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReconcileContainerResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
@@ -722,6 +724,12 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
             .setStatus(Status.OK)
             .setGetMetricsResponse(getMetrics(request.getGetMetricsRequest()))
             .build();
+      case ReconcileContainer:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            
.setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest()))
+            .build();
       default:
         throw new IllegalArgumentException(
             "Unknown command type: " + request.getCmdType());
@@ -1333,4 +1341,9 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
   public GetMetricsResponseProto getMetrics(GetMetricsRequestProto request) 
throws IOException {
     return 
GetMetricsResponseProto.newBuilder().setMetricsJson(impl.getMetrics(request.getQuery())).build();
   }
+
+  public ReconcileContainerResponseProto 
reconcileContainer(ReconcileContainerRequestProto request) throws IOException {
+    impl.reconcileContainer(request.getContainerID());
+    return ReconcileContainerResponseProto.getDefaultInstance();
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 47bc66d833..9cf44677eb 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -55,6 +55,8 @@ import 
org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancerConfigurat
 import 
org.apache.hadoop.hdds.scm.container.balancer.IllegalContainerBalancerStateException;
 import 
org.apache.hadoop.hdds.scm.container.balancer.InvalidContainerBalancerConfigurationException;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler;
+import 
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@@ -336,7 +338,9 @@ public class SCMClientProtocolServer implements
               .setPlaceOfBirth(r.getOriginDatanodeId().toString())
               .setKeyCount(r.getKeyCount())
               .setSequenceID(r.getSequenceId())
-              .setReplicaIndex(r.getReplicaIndex()).build()
+              .setReplicaIndex(r.getReplicaIndex())
+              .setDataChecksum(r.getDataChecksum())
+              .build()
       );
     }
     return results;
@@ -1438,4 +1442,42 @@ public class SCMClientProtocolServer implements
     FetchMetrics fetchMetrics = new FetchMetrics();
     return fetchMetrics.getMetrics(query);
   }
+
+  @Override
+  public void reconcileContainer(long longContainerID) throws IOException {
+    ContainerID containerID = ContainerID.valueOf(longContainerID);
+    getScm().checkAdminAccess(getRemoteUser(), false);
+    final UserGroupInformation remoteUser = getRemoteUser();
+    final Map<String, String> auditMap = Maps.newHashMap();
+    auditMap.put("containerID", containerID.toString());
+    auditMap.put("remoteUser", remoteUser.getUserName());
+
+    try {
+      EligibilityResult result = 
ReconciliationEligibilityHandler.isEligibleForReconciliation(containerID,
+          getScm().getContainerManager());
+      if (!result.isOk()) {
+        switch (result.getResult()) {
+        case OK:
+          break;
+        case CONTAINER_NOT_FOUND:
+          throw new ContainerNotFoundException(result.toString());
+        case INELIGIBLE_CONTAINER_STATE:
+          throw new SCMException(result.toString(), 
ResultCodes.UNEXPECTED_CONTAINER_STATE);
+        case INELIGIBLE_REPLICA_STATES:
+        case INELIGIBLE_REPLICATION_TYPE:
+        case NOT_ENOUGH_REQUIRED_NODES:
+        case NO_REPLICAS_FOUND:
+          throw new SCMException(result.toString(), 
ResultCodes.UNSUPPORTED_OPERATION);
+        default:
+          throw new SCMException("Unknown reconciliation eligibility result " 
+ result, ResultCodes.INTERNAL_ERROR);
+        }
+      }
+
+      scm.getEventQueue().fireEvent(SCMEvents.RECONCILE_CONTAINER, 
containerID);
+      
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(SCMAction.RECONCILE_CONTAINER,
 auditMap));
+    } catch (SCMException ex) {
+      
AUDIT.logWriteFailure(buildAuditMessageForFailure(SCMAction.RECONCILE_CONTAINER,
 auditMap, ex));
+      throw ex;
+    }
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 3d864d4ea2..98a7aa22f3 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -73,6 +73,7 @@ import 
org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import 
org.apache.hadoop.ozone.protocol.commands.FinalizeNewLayoutVersionCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RefreshVolumeUsageCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -94,6 +95,7 @@ import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.finalizeNewLayoutVersionCommand;
+import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconcileContainerCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reconstructECContainersCommand;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.refreshVolumeUsageInfo;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
@@ -407,6 +409,12 @@ public class SCMDatanodeProtocolServer implements
           .setRefreshVolumeUsageCommandProto(
               ((RefreshVolumeUsageCommand)cmd).getProto())
           .build();
+    case reconcileContainerCommand:
+      return builder
+          .setCommandType(reconcileContainerCommand)
+          .setReconcileContainerCommandProto(
+              ((ReconcileContainerCommand)cmd).getProto())
+          .build();
 
     default:
       throw new IllegalArgumentException("Scm command " +
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index e86dab5fd7..33fbf5fb76 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.PlacementPolicyValidateProxy;
+import 
org.apache.hadoop.hdds.scm.container.reconciliation.ReconcileContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.balancer.MoveManager;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import 
org.apache.hadoop.hdds.scm.container.replication.DatanodeCommandCountUpdatedHandler;
@@ -506,6 +507,9 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     CRLStatusReportHandler crlStatusReportHandler =
         new CRLStatusReportHandler(certificateStore, configuration);
 
+    ReconcileContainerEventHandler reconcileContainerEventHandler =
+        new ReconcileContainerEventHandler(containerManager, scmContext);
+
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, 
scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -578,6 +582,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
     eventQueue.addHandler(SCMEvents.CRL_STATUS_REPORT, crlStatusReportHandler);
+    eventQueue.addHandler(SCMEvents.RECONCILE_CONTAINER, 
reconcileContainerEventHandler);
 
     scmNodeManager.registerSendCommandNotify(
         SCMCommandProto.Type.deleteBlocksCommand,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index 4e1fe234ff..2c9df2afb4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -52,7 +52,8 @@ public enum SCMAction implements AuditAction {
   GET_REPLICATION_MANAGER_REPORT,
   RESET_DELETED_BLOCK_RETRY_COUNT,
   TRANSFER_LEADERSHIP,
-  GET_FAILED_DELETED_BLOCKS_TRANSACTION;
+  GET_FAILED_DELETED_BLOCKS_TRANSACTION,
+  RECONCILE_CONTAINER;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index 695c88d11a..8b77e30013 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -60,12 +60,14 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.doAnswer;
@@ -100,7 +102,6 @@ public class TestContainerReportHandler {
     dbStore = DBStoreBuilder.createDBStore(
         conf, new SCMDBDefinition());
     scmhaManager = SCMHAManagerStub.getInstance(true);
-    nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
         new MockPipelineManager(dbStore, scmhaManager, nodeManager);
     containerStateManager = ContainerStateManagerImpl.newBuilder()
@@ -979,6 +980,153 @@ public class TestContainerReportHandler {
         containerOne.containerID()).size());
   }
 
+  @Test
+  public void testWithNoContainerDataChecksum() throws Exception {
+    final ContainerReportHandler reportHandler = new 
ContainerReportHandler(nodeManager, containerManager);
+
+    final int numNodes = 3;
+    List<DatanodeDetails> datanodes = 
nodeManager.getNodes(NodeStatus.inServiceHealthy()).stream()
+        .limit(numNodes)
+        .collect(Collectors.toList());
+
+    // Create a container and put one replica on each datanode.
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+    ContainerID contID = container.containerID();
+    final Set<ContainerID> containerIDSet = 
Stream.of(contID).collect(Collectors.toSet());
+
+    for (DatanodeDetails dn: datanodes) {
+      nodeManager.setContainers(dn, containerIDSet);
+    }
+
+    containerStateManager.addContainer(container.getProtobuf());
+
+    getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+        .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+    // Container manager should now be aware of 3 replicas of each container.
+    assertEquals(numNodes, 
containerManager.getContainerReplicas(contID).size());
+
+    // All replicas should start with an empty data checksum in SCM.
+    boolean contOneDataChecksumsEmpty = 
containerManager.getContainerReplicas(contID).stream()
+        .allMatch(r -> r.getDataChecksum() == 0);
+    assertTrue(contOneDataChecksumsEmpty, "Replicas of container one should 
not yet have any data checksums.");
+
+    // Send a report to SCM from one datanode that still does not have a data 
checksum.
+    int numReportsSent = 0;
+    for (DatanodeDetails dn: datanodes) {
+      final ContainerReportsProto dnReportProto = getContainerReportsProto(
+          contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+      final ContainerReportFromDatanode dnReport = new 
ContainerReportFromDatanode(dn, dnReportProto);
+      reportHandler.onMessage(dnReport, publisher);
+      numReportsSent++;
+    }
+    assertEquals(numNodes, numReportsSent);
+
+    // Regardless of which datanode sent the report, none of them have 
checksums, so all replica's data checksums
+    // should remain empty.
+    boolean containerDataChecksumEmpty = 
containerManager.getContainerReplicas(contID).stream()
+        .allMatch(r -> r.getDataChecksum() == 0);
+    assertTrue(containerDataChecksumEmpty, "Replicas of the container should 
not have any data checksums.");
+  }
+
+  @Test
+  public void testWithContainerDataChecksum() throws Exception {
+    final ContainerReportHandler reportHandler = new 
ContainerReportHandler(nodeManager, containerManager);
+
+    final int numNodes = 3;
+    List<DatanodeDetails> datanodes = 
nodeManager.getNodes(NodeStatus.inServiceHealthy()).stream()
+        .limit(numNodes)
+        .collect(Collectors.toList());
+
+    // Create a container and put one replica on each datanode.
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+    ContainerID contID = container.containerID();
+    final Set<ContainerID> containerIDSet = 
Stream.of(contID).collect(Collectors.toSet());
+
+    for (DatanodeDetails dn: datanodes) {
+      nodeManager.setContainers(dn, containerIDSet);
+    }
+
+    containerStateManager.addContainer(container.getProtobuf());
+
+    getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+        .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+    // Container manager should now be aware of 3 replicas of each container.
+    assertEquals(numNodes, 
containerManager.getContainerReplicas(contID).size());
+
+    // All replicas should start with an empty data checksum in SCM.
+    boolean dataChecksumsEmpty = 
containerManager.getContainerReplicas(contID).stream()
+        .allMatch(r -> r.getDataChecksum() == 0);
+    assertTrue(dataChecksumsEmpty, "Replicas of container one should not yet 
have any data checksums.");
+
+    // For each datanode, send a container report with a mismatched checksum.
+    for (DatanodeDetails dn: datanodes) {
+      ContainerReportsProto dnReportProto = getContainerReportsProto(
+          contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+      ContainerReplicaProto replicaWithChecksum = 
dnReportProto.getReports(0).toBuilder()
+          .setDataChecksum(createUniqueDataChecksumForReplica(contID, 
dn.getUuidString()))
+          .build();
+      ContainerReportsProto reportWithChecksum = dnReportProto.toBuilder()
+          .clearReports()
+          .addReports(replicaWithChecksum)
+          .build();
+      final ContainerReportFromDatanode dnReport = new 
ContainerReportFromDatanode(dn, reportWithChecksum);
+      reportHandler.onMessage(dnReport, publisher);
+    }
+
+    // All the replicas should have different checksums.
+    // Since the containers don't have any data in this test, different 
checksums are based on container ID and
+    // datanode ID.
+    int numReplicasChecked = 0;
+    for (ContainerReplica replica: 
containerManager.getContainerReplicas(contID)) {
+      long expectedChecksum = createUniqueDataChecksumForReplica(contID, 
replica.getDatanodeDetails().getUuidString());
+      assertEquals(expectedChecksum, replica.getDataChecksum());
+      numReplicasChecked++;
+    }
+    assertEquals(numNodes, numReplicasChecked);
+
+    // For each datanode, send a container report with a matching checksum.
+    // This simulates reconciliation running.
+    for (DatanodeDetails dn: datanodes) {
+      ContainerReportsProto dnReportProto = getContainerReportsProto(
+          contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+      ContainerReplicaProto replicaWithChecksum = 
dnReportProto.getReports(0).toBuilder()
+          .setDataChecksum(createMatchingDataChecksumForReplica(contID))
+          .build();
+      ContainerReportsProto reportWithChecksum = dnReportProto.toBuilder()
+          .clearReports()
+          .addReports(replicaWithChecksum)
+          .build();
+      final ContainerReportFromDatanode dnReport = new 
ContainerReportFromDatanode(dn, reportWithChecksum);
+      reportHandler.onMessage(dnReport, publisher);
+    }
+
+    // All the replicas should now have matching checksums.
+    // Since the containers don't have any data in this test, the matching 
checksums are based on container ID only.
+    numReplicasChecked = 0;
+    for (ContainerReplica replica: 
containerManager.getContainerReplicas(contID)) {
+      long expectedChecksum = createMatchingDataChecksumForReplica(contID);
+      assertEquals(expectedChecksum, replica.getDataChecksum());
+      numReplicasChecked++;
+    }
+    assertEquals(numNodes, numReplicasChecked);
+  }
+
+  /**
+   * Generates a placeholder data checksum for testing that is specific to a 
container replica.
+   */
+  protected static long createUniqueDataChecksumForReplica(ContainerID 
containerID, String datanodeID) {
+    return (datanodeID + containerID).hashCode();
+  }
+
+  /**
+   * Generates a placeholder data checksum for testing that is the same for 
all container replicas.
+   */
+  protected static long createMatchingDataChecksumForReplica(ContainerID 
containerID) {
+    return Objects.hashCode(containerID);
+  }
+
   private ContainerReportFromDatanode getContainerReportFromDatanode(
       ContainerID containerId, ContainerReplicaProto.State state,
       DatanodeDetails dn, long bytesUsed, long keyCount) {
@@ -1021,7 +1169,6 @@ public class TestContainerReportHandler {
             .setContainerID(containerId.getId())
             .setState(state)
             .setOriginNodeId(originNodeId)
-            .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
             .setSize(5368709120L)
             .setUsed(usedBytes)
             .setKeyCount(keyCount)
@@ -1035,5 +1182,4 @@ public class TestContainerReportHandler {
             .build();
     return crBuilder.addReports(replicaProto).build();
   }
-
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
index dbcccce598..9abbda8193 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java
@@ -76,6 +76,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
@@ -83,6 +84,9 @@ import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getContainer;
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getECContainer;
 import static org.apache.hadoop.hdds.scm.HddsTestUtils.getReplicas;
+import static 
org.apache.hadoop.hdds.scm.container.TestContainerReportHandler.createMatchingDataChecksumForReplica;
+import static 
org.apache.hadoop.hdds.scm.container.TestContainerReportHandler.createUniqueDataChecksumForReplica;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.doAnswer;
@@ -576,6 +580,144 @@ public class TestIncrementalContainerReportHandler {
     }
   }
 
+  @Test
+  public void testWithNoContainerDataChecksum() throws Exception {
+    final IncrementalContainerReportHandler reportHandler = new 
IncrementalContainerReportHandler(nodeManager,
+        containerManager, scmContext);
+
+    final int numNodes = 3;
+
+    // Create a container which will have one replica on each datanode.
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+    ContainerID contID = container.containerID();
+    final Set<ContainerID> containerIDSet = 
Stream.of(contID).collect(Collectors.toSet());
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < numNodes; i++) {
+      DatanodeDetails dn = randomDatanodeDetails();
+      nodeManager.register(dn, null, null);
+      nodeManager.setContainers(dn, containerIDSet);
+      datanodes.add(dn);
+    }
+
+    containerStateManager.addContainer(container.getProtobuf());
+
+    getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+        .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+    // Container manager should now be aware of 3 replicas of each container.
+    assertEquals(numNodes, 
containerManager.getContainerReplicas(contID).size());
+
+    // All replicas should start with an empty data checksum in SCM.
+    boolean contOneDataChecksumsEmpty = 
containerManager.getContainerReplicas(contID).stream()
+        .allMatch(r -> r.getDataChecksum() == 0);
+    assertTrue(contOneDataChecksumsEmpty, "Replicas of container one should 
not yet have any data checksums.");
+
+    // Send a report to SCM from one datanode that still does not have a data 
checksum.
+    for (DatanodeDetails dn: datanodes) {
+      final IncrementalContainerReportProto dnReportProto = 
getIncrementalContainerReportProto(
+          contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+      final IncrementalContainerReportFromDatanode dnReport = new 
IncrementalContainerReportFromDatanode(dn,
+          dnReportProto);
+      reportHandler.onMessage(dnReport, publisher);
+    }
+
+    // Regardless of which datanode sent the report, none of them have 
checksums, so all replica's data checksums
+    // should remain empty.
+    boolean containerDataChecksumEmpty = 
containerManager.getContainerReplicas(contID).stream()
+        .allMatch(r -> r.getDataChecksum() == 0);
+    assertTrue(containerDataChecksumEmpty, "Replicas of the container should 
not have any data checksums.");
+  }
+
+  @Test
+  public void testWithContainerDataChecksum() throws Exception {
+    final IncrementalContainerReportHandler reportHandler = new 
IncrementalContainerReportHandler(nodeManager,
+        containerManager, scmContext);
+
+    final int numNodes = 3;
+
+    // Create a container which will have one replica on each datanode.
+    final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+    ContainerID contID = container.containerID();
+    final Set<ContainerID> containerIDSet = 
Stream.of(contID).collect(Collectors.toSet());
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < numNodes; i++) {
+      DatanodeDetails dn = randomDatanodeDetails();
+      nodeManager.register(dn, null, null);
+      nodeManager.setContainers(dn, containerIDSet);
+      datanodes.add(dn);
+    }
+
+    containerStateManager.addContainer(container.getProtobuf());
+
+    getReplicas(contID, ContainerReplicaProto.State.CLOSED, 0, datanodes)
+        .forEach(r -> containerStateManager.updateContainerReplica(contID, r));
+
+    // Container manager should now be aware of 3 replicas of each container.
+    assertEquals(3, containerManager.getContainerReplicas(contID).size());
+
+    // All replicas should start with a zero data checksum in SCM.
+    boolean dataChecksumsEmpty = 
containerManager.getContainerReplicas(contID).stream()
+        .allMatch(r -> r.getDataChecksum() == 0);
+    assertTrue(dataChecksumsEmpty, "Replicas of container one should not yet 
have any data checksums.");
+
+    // For each datanode, send a container report with a mismatched checksum.
+    for (DatanodeDetails dn: datanodes) {
+      IncrementalContainerReportProto dnReportProto = 
getIncrementalContainerReportProto(
+          contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+      ContainerReplicaProto replicaWithChecksum = 
dnReportProto.getReport(0).toBuilder()
+          .setDataChecksum(createUniqueDataChecksumForReplica(contID, 
dn.getUuidString()))
+          .build();
+      IncrementalContainerReportProto reportWithChecksum = 
dnReportProto.toBuilder()
+          .clearReport()
+          .addReport(replicaWithChecksum)
+          .build();
+      final IncrementalContainerReportFromDatanode dnReport = new 
IncrementalContainerReportFromDatanode(dn,
+          reportWithChecksum);
+      reportHandler.onMessage(dnReport, publisher);
+    }
+
+    // All the replicas should have different checksums.
+    // Since the containers don't have any data in this test, different 
checksums are based on container ID and
+    // datanode ID.
+    int numReplicasChecked = 0;
+    for (ContainerReplica replica: 
containerManager.getContainerReplicas(contID)) {
+      long expectedChecksum = createUniqueDataChecksumForReplica(
+          contID, replica.getDatanodeDetails().getUuidString());
+      assertEquals(expectedChecksum, replica.getDataChecksum());
+      numReplicasChecked++;
+    }
+    assertEquals(numNodes, numReplicasChecked);
+
+    // For each datanode, send a container report with a matching checksum.
+    // This simulates reconciliation running.
+    for (DatanodeDetails dn: datanodes) {
+      IncrementalContainerReportProto dnReportProto = 
getIncrementalContainerReportProto(
+          contID, ContainerReplicaProto.State.CLOSED, dn.getUuidString());
+      ContainerReplicaProto replicaWithChecksum = 
dnReportProto.getReport(0).toBuilder()
+          .setDataChecksum(createMatchingDataChecksumForReplica(contID))
+          .build();
+      IncrementalContainerReportProto reportWithChecksum = 
dnReportProto.toBuilder()
+          .clearReport()
+          .addReport(replicaWithChecksum)
+          .build();
+      IncrementalContainerReportFromDatanode dnReport = new 
IncrementalContainerReportFromDatanode(dn,
+          reportWithChecksum);
+      reportHandler.onMessage(dnReport, publisher);
+    }
+
+    // All the replicas should now have matching checksums.
+    // Since the containers don't have any data in this test, the matching 
checksums are based on container ID only.
+    numReplicasChecked = 0;
+    for (ContainerReplica replica: 
containerManager.getContainerReplicas(contID)) {
+      long expectedChecksum = createMatchingDataChecksumForReplica(contID);
+      assertEquals(expectedChecksum, replica.getDataChecksum());
+      numReplicasChecked++;
+    }
+    assertEquals(numNodes, numReplicasChecked);
+  }
+
   private static IncrementalContainerReportProto
       getIncrementalContainerReportProto(ContainerReplicaProto replicaProto) {
     final IncrementalContainerReportProto.Builder crBuilder =
@@ -595,7 +737,6 @@ public class TestIncrementalContainerReportHandler {
                     .setContainerID(containerId.getId())
                     .setState(state)
                     .setOriginNodeId(originNodeId)
-                    .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2")
                     .setSize(5368709120L)
                     .setUsed(2000000000L)
                     .setKeyCount(100000000L)
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java
new file mode 100644
index 0000000000..ffc96217b4
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/reconciliation/TestReconcileContainerEventHandler.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.reconciliation;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReconcileContainerCommandProto;
+import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import 
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.EligibilityResult;
+import 
org.apache.hadoop.hdds.scm.container.reconciliation.ReconciliationEligibilityHandler.Result;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests that the ReconcileContainerEventHandler properly accepts and rejects 
reconciliation events based on
+ * container state, and dispatches commands to datanodes accordingly.
+ */
+public class TestReconcileContainerEventHandler {
+  private ContainerManager containerManager;
+  private EventPublisher eventPublisher;
+  private ReconcileContainerEventHandler eventHandler;
+  private SCMContext scmContext;
+
+  private static final ContainerID CONTAINER_ID = ContainerID.valueOf(123L);
+  private static final long LEADER_TERM = 3L;
+
+  private static final ReplicationConfig RATIS_THREE_REP = 
RatisReplicationConfig.getInstance(THREE);
+  private static final ReplicationConfig RATIS_ONE_REP = 
RatisReplicationConfig.getInstance(ONE);
+  private static final ReplicationConfig EC_REP = new ECReplicationConfig(3, 
2);
+
+  private ArgumentCaptor<CommandForDatanode<ReconcileContainerCommandProto>> 
commandCaptor;
+
+  @BeforeEach
+  public void setup() throws Exception {
+    commandCaptor = ArgumentCaptor.forClass(CommandForDatanode.class);
+    containerManager = mock(ContainerManager.class);
+    scmContext = mock(SCMContext.class);
+    when(scmContext.isLeader()).thenReturn(true);
+    when(scmContext.getTermOfLeader()).thenReturn(LEADER_TERM);
+    eventPublisher = mock(EventPublisher.class);
+    eventHandler = new ReconcileContainerEventHandler(containerManager, 
scmContext);
+  }
+
+  /**
+   * EC containers are not yet supported for reconciliation.
+   */
+  @Test
+  public void testReconcileECContainer() throws Exception {
+    addContainer(EC_REP, LifeCycleState.CLOSED);
+    addReplicasToContainer(5);
+
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    assertFalse(result.isOk());
+    assertEquals(Result.INELIGIBLE_REPLICATION_TYPE, result.getResult());
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+  }
+
+  /**
+   * Ratis 1 containers are not currently supported for reconciliation.
+   */
+  @Test
+  public void testReconcileRatisOneContainer() throws Exception {
+    addContainer(RATIS_ONE_REP, LifeCycleState.CLOSED);
+    addReplicasToContainer(1);
+
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    assertFalse(result.isOk());
+    assertEquals(Result.NOT_ENOUGH_REQUIRED_NODES, result.getResult());
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+  }
+
+  @Test
+  public void testReconcileWhenNotLeader() throws Exception {
+    addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+    addReplicasToContainer(3);
+    when(scmContext.isLeader()).thenReturn(false);
+
+    // Container is eligible for reconciliation, but the request will not go 
through because this SCM is not the leader.
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    assertTrue(result.isOk());
+    assertEquals(Result.OK, result.getResult());
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+  }
+
+  @Test
+  public void testReconcileNonexistentContainer() throws Exception {
+    // The step of adding the container to the mocked ContainerManager is 
intentionally skipped to simulate a
+    // nonexistent container.
+    // No exceptions should be thrown out of this test method when this 
happens. If they are, they will be propagated
+    // and the test will fail.
+    when(containerManager.getContainer(any())).thenThrow(new 
ContainerNotFoundException());
+
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    assertFalse(result.isOk());
+    assertEquals(Result.CONTAINER_NOT_FOUND, result.getResult());
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+  }
+
+  @Test
+  public void testReconcileMissingContainer() throws Exception {
+    addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+    assertTrue(containerManager.getContainerReplicas(CONTAINER_ID).isEmpty(),
+        "Expected no replicas for this container");
+
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    assertFalse(result.isOk());
+    assertEquals(Result.NO_REPLICAS_FOUND, result.getResult());
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), any());
+  }
+
+  @ParameterizedTest
+  @EnumSource(LifeCycleState.class)
+  public void testReconcileWithContainerStates(LifeCycleState state) throws 
Exception {
+    addContainer(RATIS_THREE_REP, state);
+    addReplicasToContainer(3);
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    switch (state) {
+    case OPEN:
+    case CLOSING:
+    case DELETING:
+    case DELETED:
+    case RECOVERING:
+      assertFalse(result.isOk());
+      assertEquals(Result.INELIGIBLE_CONTAINER_STATE, result.getResult());
+      verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), 
commandCaptor.capture());
+      break;
+    default:
+      assertTrue(result.isOk());
+      assertEquals(Result.OK, result.getResult());
+      verify(eventPublisher, times(3)).fireEvent(eq(DATANODE_COMMAND), 
commandCaptor.capture());
+      break;
+    }
+  }
+
+  // TODO HDDS-10714 will change which datanodes are eligible to participate 
in reconciliation.
+  @Test
+  public void testReconcileSentToAllPeers() throws Exception {
+    addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+    Set<ContainerReplica> replicas = addReplicasToContainer(3);
+    Set<UUID> allNodeIDs = replicas.stream()
+        .map(r -> r.getDatanodeDetails().getUuid())
+        .collect(Collectors.toSet());
+
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+    assertTrue(result.isOk());
+    assertEquals(Result.OK, result.getResult());
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    assertEquals(3, replicas.size());
+    assertEquals(allNodeIDs.size(), replicas.size());
+    verify(eventPublisher, 
times(replicas.size())).fireEvent(eq(DATANODE_COMMAND), 
commandCaptor.capture());
+
+    // Check each reconcile command sent for correctness.
+    Set<UUID> nodesReceivingCommands = new HashSet<>();
+    for (CommandForDatanode<ReconcileContainerCommandProto> dnCommand: 
commandCaptor.getAllValues()) {
+      SCMCommand<ReconcileContainerCommandProto> reconcileCommand = 
dnCommand.getCommand();
+      ReconcileContainerCommandProto reconcileProto = 
reconcileCommand.getProto();
+      // All commands should use the latest term of SCM so the datanode does 
not drop them.
+      assertEquals(LEADER_TERM, reconcileCommand.getTerm());
+      // All commands should have the same container ID.
+      assertEquals(CONTAINER_ID, 
ContainerID.valueOf(reconcileProto.getContainerID()));
+      // Container ID is also used as the command's identifier.
+      assertEquals(CONTAINER_ID, 
ContainerID.valueOf(reconcileCommand.getId()));
+
+      // Every node should receive exactly one reconcile command.
+      UUID targetNodeID = dnCommand.getDatanodeId();
+      assertTrue(nodesReceivingCommands.add(targetNodeID), "Duplicate 
reconcile command sent to datanode.");
+      // All commands should have correctly constructed peer lists that 
exclude the node receiving the command.
+      Set<UUID> expectedPeerIDs = allNodeIDs.stream()
+          .filter(id -> id != targetNodeID)
+          .collect(Collectors.toSet());
+      Set<UUID> actualPeerIDs = reconcileProto.getPeersList().stream()
+              .map(dn -> UUID.fromString(dn.getUuid()))
+              .collect(Collectors.toSet());
+      assertEquals(replicas.size() - 1, actualPeerIDs.size());
+      assertEquals(expectedPeerIDs, actualPeerIDs);
+    }
+
+    assertEquals(allNodeIDs, nodesReceivingCommands);
+  }
+
+  @ParameterizedTest
+  @EnumSource(State.class)
+  public void testReconcileFailsWithIneligibleReplicas(State replicaState) 
throws Exception {
+    // Overall container state is eligible for reconciliation, but some 
replicas may not be.
+    // This means the container will not be considered eligible.
+    addContainer(RATIS_THREE_REP, LifeCycleState.CLOSED);
+    // Only one replica is in a different state.
+    addReplicasToContainer(replicaState, State.CLOSED, State.CLOSED);
+
+    EligibilityResult result =
+        
ReconciliationEligibilityHandler.isEligibleForReconciliation(CONTAINER_ID, 
containerManager);
+
+    eventHandler.onMessage(CONTAINER_ID, eventPublisher);
+    switch (replicaState) {
+    case OPEN:
+    case INVALID:
+    case DELETED:
+    case CLOSING:
+      assertFalse(result.isOk());
+      assertEquals(Result.INELIGIBLE_REPLICA_STATES, result.getResult());
+      verify(eventPublisher, never()).fireEvent(eq(DATANODE_COMMAND), 
commandCaptor.capture());
+      break;
+    default:
+      assertTrue(result.isOk());
+      assertEquals(Result.OK, result.getResult());
+      verify(eventPublisher, times(3)).fireEvent(eq(DATANODE_COMMAND), 
commandCaptor.capture());
+      break;
+    }
+  }
+
+  private ContainerInfo addContainer(ReplicationConfig repConfig, 
LifeCycleState state) throws Exception {
+    ContainerInfo container = new ContainerInfo.Builder()
+        .setContainerID(CONTAINER_ID.getId())
+        .setReplicationConfig(repConfig)
+        .setState(state)
+        .build();
+    when(containerManager.getContainer(CONTAINER_ID)).thenReturn(container);
+    return container;
+  }
+
+  private Set<ContainerReplica> addReplicasToContainer(int count) throws 
Exception {
+    State[] replicaStates = new State[count];
+    Arrays.fill(replicaStates, State.CLOSED);
+    return addReplicasToContainer(replicaStates);
+  }
+
+  private Set<ContainerReplica> addReplicasToContainer(State... replicaStates) 
throws Exception {
+    // Add one container replica for each replica state specified.
+    // If no states are specified, replica list will be empty.
+    Set<ContainerReplica> replicas = new HashSet<>();
+    try (MockNodeManager nodeManager = new MockNodeManager(true, 
replicaStates.length)) {
+      List<DatanodeDetails> nodes = nodeManager.getAllNodes();
+      for (int i = 0; i < replicaStates.length; i++) {
+        replicas.addAll(HddsTestUtils.getReplicas(CONTAINER_ID, 
replicaStates[i], nodes.get(i)));
+      }
+    }
+    
when(containerManager.getContainerReplicas(CONTAINER_ID)).thenReturn(replicas);
+
+    return replicas;
+  }
+}
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 0dd52cd291..f46ee62fd8 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -575,4 +575,8 @@ public class ContainerOperationClient implements ScmClient {
     return storageContainerLocationClient.getMetrics(query);
   }
 
+  @Override
+  public void reconcileContainer(long id) throws IOException {
+    storageContainerLocationClient.reconcileContainer(id);
+  }
 }
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
index 54c69273f0..9f93c56f2d 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ContainerCommands.java
@@ -43,7 +43,8 @@ import picocli.CommandLine.Spec;
         CreateSubcommand.class,
         CloseSubcommand.class,
         ReportSubcommand.class,
-        UpgradeSubcommand.class
+        UpgradeSubcommand.class,
+        ReconcileSubcommand.class
     })
 @MetaInfServices(SubcommandWithParent.class)
 public class ContainerCommands implements Callable<Void>, SubcommandWithParent 
{
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java
new file mode 100644
index 0000000000..e747455a88
--- /dev/null
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/container/ReconcileSubcommand.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.container;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.scm.cli.ScmSubcommand;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+/**
+ * This is the handler that process container list command.
+ */
+@Command(
+    name = "reconcile",
+    description = "Reconcile container replicas",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class ReconcileSubcommand extends ScmSubcommand {
+
+  @CommandLine.Parameters(description = "ID of the container to reconcile")
+  private long containerId;
+
+  @Override
+  public void execute(ScmClient scmClient) throws IOException {
+    scmClient.reconcileContainer(containerId);
+    System.out.println("Reconciliation has been triggered for container " + 
containerId);
+    // TODO a better option to check status may be added later.
+    System.out.println("Use \"ozone admin container info --json " + 
containerId + "\" to see the checksums of each " +
+        "container replica");
+  }
+}
diff --git a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot 
b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
index c50daa724d..fae0899178 100644
--- a/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/admincli/container.robot
@@ -34,6 +34,12 @@ Container is closed
     ${output} =         Execute          ozone admin container info 
"${container}"
                         Should contain   ${output}   CLOSED
 
+Reconciliation complete
+    [arguments]    ${container}
+    ${data_checksum} =  Execute          ozone admin container info 
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
+                        Should not be empty    ${data_checksum}
+                        Should not be equal as strings    0    ${data_checksum}
+
 *** Test Cases ***
 Create container
     ${output} =         Execute          ozone admin container create
@@ -71,6 +77,17 @@ Verbose container info
     ${output} =         Execute          ozone admin --verbose container info 
"${CONTAINER}"
                         Should contain   ${output}   Pipeline Info
 
+Incomplete command
+    ${output} =         Execute And Ignore Error     ozone admin container
+                        Should contain   ${output}   Incomplete command
+                        Should contain   ${output}   list
+                        Should contain   ${output}   info
+                        Should contain   ${output}   create
+                        Should contain   ${output}   close
+                        Should contain   ${output}   reconcile
+                        Should contain   ${output}   report
+                        Should contain   ${output}   upgrade
+
 List containers as JSON
     ${output} =         Execute          ozone admin container info 
"${CONTAINER}" --json | jq -r '.'
                         Should contain   ${output}    containerInfo
@@ -84,23 +101,6 @@ Report containers as JSON
                          Should contain   ${output}   stats
                          Should contain   ${output}   samples
 
-Close container
-    ${container} =      Execute          ozone admin container list --state 
OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | 
.containerID' | head -1
-                        Execute          ozone admin container close 
"${container}"
-    ${output} =         Execute          ozone admin container info 
"${container}"
-                        Should contain   ${output}   CLOS
-    Wait until keyword succeeds    1min    10sec    Container is closed    
${container}
-
-Incomplete command
-    ${output} =         Execute And Ignore Error     ozone admin container
-                        Should contain   ${output}   Incomplete command
-                        Should contain   ${output}   list
-                        Should contain   ${output}   info
-                        Should contain   ${output}   create
-                        Should contain   ${output}   close
-                        Should contain   ${output}   report
-                        Should contain   ${output}   upgrade
-
 #List containers on unknown host
 #    ${output} =         Execute And Ignore Error     ozone admin --verbose 
container list --scm unknown-host
 #                        Should contain   ${output}   Invalid host name
@@ -111,5 +111,38 @@ Cannot close container without admin privilege
 Cannot create container without admin privilege
     Requires admin privilege    ozone admin container create
 
+Cannot reconcile container without admin privilege
+    Requires admin privilege    ozone admin container reconcile "${CONTAINER}"
+
 Reset user
     Run Keyword if      '${SECURITY_ENABLED}' == 'true'     Kinit test user    
 testuser     testuser.keytab
+
+Cannot reconcile open container
+    # At this point we should have an open Ratis Three container.
+    ${container} =      Execute          ozone admin container list --state 
OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | 
.containerID' | head -n1
+    Execute and check rc    ozone admin container reconcile "${container}"    
255
+    # The container should not yet have any replica checksums.
+    # TODO When the scanner is computing checksums automatically, this test 
may need to be updated.
+    ${data_checksum} =  Execute          ozone admin container info 
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
+    # 0 is the hex value of an empty checksum.
+    Should Be Equal As Strings    0    ${data_checksum}
+
+Close container
+    ${container} =      Execute          ozone admin container list --state 
OPEN | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | 
.containerID' | head -1
+                        Execute          ozone admin container close 
"${container}"
+    # The container may either be in CLOSED or CLOSING state at this point. 
Once we have verified this, we will wait
+    # for it to progress to CLOSED.
+    ${output} =         Execute          ozone admin container info 
"${container}"
+                        Should contain   ${output}   CLOS
+    Wait until keyword succeeds    1min    10sec    Container is closed    
${container}
+
+Reconcile closed container
+    # Check that info does not show replica checksums, since manual 
reconciliation has not yet been triggered.
+    # TODO When the scanner is computing checksums automatically, this test 
may need to be updated.
+    ${container} =      Execute          ozone admin container list --state 
CLOSED | jq -r 'select(.replicationConfig.replicationFactor == "THREE") | 
.containerID' | head -1
+    ${data_checksum} =  Execute          ozone admin container info 
"${container}" --json | jq -r '.replicas[].dataChecksum' | head -n1
+    # 0 is the hex value of an empty checksum.
+    Should Be Equal As Strings    0    ${data_checksum}
+    # When reconciliation finishes, replica checksums should be shown.
+    Execute    ozone admin container reconcile ${container}
+    Wait until keyword succeeds    1min    5sec    Reconciliation complete    
${container}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to