errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r1924148029


##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java:
##########
@@ -70,6 +70,7 @@
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.createBlockMetaData;

Review Comment:
   We need unit testing of `KeyValueContainerHandler#reconcileContainer` here 
using a mocked `DNContainerOperationClient` to get various replica sets



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1435,132 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    updateContainerChecksum(containerData);

Review Comment:
   Sending an ICR out immediately with the checksum built on the metadata like 
we do here is probably good for quick feedback. However I think we should also 
queue the container for on-demand scanning to really check our work.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);

Review Comment:
   ```suggestion
           LOG.warn("Cannot reconcile container {} with peer {} which has not 
yet generated a checksum", containerData.getContainerID, peer);
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);
+    containerData.setDataChecksum(dataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTree merkleTree = new ContainerMerkleTree();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    return merkleTree.toProto().getDataChecksum();
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  long scmBlockSize, XceiverClientSpi 
xceiverClient,
+                                  ContainerProtos.BlockMerkleTree 
missingBlock) throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, scmBlockSize);
+    // TODO: Cache the blockResponse to reuse it again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    // TODO: Add BcsId in BlockMerkleTree to avoid this call

Review Comment:
   I would leave this extra call in here instead of adding BCSID to the merkle 
tree. If it is not being used in the hash it does not make sense to add it to 
the tree. We do need to update our container's BCSID after reconciliation for 
compatibility with the current replication manager. This would be the max of 
our current container BCSID or any BCSID of a block we touched during the 
reconciliation process.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java:
##########
@@ -121,7 +121,7 @@ public long persistPutBlock(KeyValueContainer container,
       // default blockCommitSequenceId for any block is 0. It the putBlock
       // request is not coming via Ratis(for test scenarios), it will be 0.
       // In such cases, we should overwrite the block as well
-      if ((bcsId != 0) && (bcsId <= containerBCSId)) {
+      if ((bcsId != 0) && (bcsId < containerBCSId)) {

Review Comment:
   Looks like this will still fail the write if we try to add a block whose 
BCSID is less than that in the container. Can you check the tests to see if 
this is being handled?



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1435,132 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    updateContainerChecksum(containerData);
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);
+    containerData.setDataChecksum(dataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {

Review Comment:
   We should re-use/combine this with 
`KeyValueHandler#createContainerMerkleTree`



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);
+    containerData.setDataChecksum(dataChecksum);

Review Comment:
   Let's move this inside the `updateContainerChecksum` method



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -252,17 +272,155 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    long containerID = writeDataAndGetContainer(true, 20 * 1024 * 1024);
+    Set<DatanodeDetails> peerNodes = cluster.getHddsDatanodes().stream().map(
+        HddsDatanodeService::getDatanodeDetails).collect(Collectors.toSet());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanodes().get(0);
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDatas = blockManager.listBlock(container, -1, 100);
+    List<BlockData> deletedBlocks = new ArrayList<>();
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDatas.size(); i += 2) {
+        BlockData blockData = blockDatas.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+        deletedBlocks.add(blockData);
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);

Review Comment:
   Add a TODO here to use on demand scanning instead once the scanner 
integration PR is merged



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);
+    containerData.setDataChecksum(dataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTree merkleTree = new ContainerMerkleTree();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    return merkleTree.toProto().getDataChecksum();
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  long scmBlockSize, XceiverClientSpi 
xceiverClient,
+                                  ContainerProtos.BlockMerkleTree 
missingBlock) throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, scmBlockSize);
+    // TODO: Cache the blockResponse to reuse it again.

Review Comment:
   Why would we use the response again? There is only one call that will be 
repairing the block.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);
+    containerData.setDataChecksum(dataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTree merkleTree = new ContainerMerkleTree();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    return merkleTree.toProto().getDataChecksum();
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  long scmBlockSize, XceiverClientSpi 
xceiverClient,
+                                  ContainerProtos.BlockMerkleTree 
missingBlock) throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, scmBlockSize);
+    // TODO: Cache the blockResponse to reuse it again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    // TODO: Add BcsId in BlockMerkleTree to avoid this call
+    ContainerProtos.GetCommittedBlockLengthResponseProto blockLengthResponse =
+        ContainerProtocolCalls.getCommittedBlockLength(xceiverClient, blockID, 
blockToken);
+    List<ContainerProtos.ChunkInfo> chunksList = 
blockResponse.getBlockData().getChunksList();
+
+    for (ContainerProtos.ChunkInfo chunkInfoProto : chunksList) {
+      ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+      ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+      chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+      writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container);
+    }
+
+    putBlockForClosedContainer(chunksList, container, 
BlockData.getFromProtoBuf(blockResponse.getBlockData()),
+        blockLengthResponse.getBlockLength());
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+
+  private void reconcileChunk(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                              long scmBlockSize, XceiverClientSpi 
xceiverClient,
+                              Map.Entry<Long, 
List<ContainerProtos.ChunkMerkleTree>> mapEntry) throws IOException {
+    long blockId = mapEntry.getKey();
+    List<ContainerProtos.ChunkMerkleTree> chunkList = mapEntry.getValue();
+    Set<Long> offsets = 
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+        .collect(Collectors.toSet());
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, scmBlockSize);
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    // TODO: Add BcsId in BlockMerkleTree to avoid this call
+    ContainerProtos.GetCommittedBlockLengthResponseProto blockLengthResponse =
+        ContainerProtocolCalls.getCommittedBlockLength(xceiverClient, blockID, 
blockToken);
+    List<ContainerProtos.ChunkInfo> chunksList = 
blockResponse.getBlockData().getChunksList();
+
+    for (ContainerProtos.ChunkInfo chunkInfoProto : chunksList) {
+      if (offsets.contains(chunkInfoProto.getOffset())) {

Review Comment:
   Why is this here? We can just use the offset that is stored with the current 
`chunkInfoProto`



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);

Review Comment:
   Let's add a comment that handling missing and corrupt chunks is done the 
same, otherwise this looks like a typo relative to the next code block



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);

Review Comment:
   We should add an entry to the container log when a container gets reconciled



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java:
##########
@@ -615,6 +615,10 @@ public ReplicationServer getReplicationServer() {
     return replicationServer;
   }
 
+  public ContainerChecksumTreeManager getChecksumTreeManager() {

Review Comment:
   This looks unused



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;

Review Comment:
   We still need to reconcile with the other peers even if just one is missing 
checksum info. See my comment on `TestKeyValueHandler` about adding tests to 
catch this



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container, 
boolean force)
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Checksum not yet generated for peer: {}", peer);
+        return;
+      }
+
+      long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE, 
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+          StorageUnit.BYTES);
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          handleMissingBlock(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, missingBlock);
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          reconcileChunk(kvContainer, containerData, tokenHelper, 
scmBlockSize, xceiverClient, entry);
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    long dataChecksum = updateContainerChecksum(containerData);
+    LOG.info("Checksum data for container {} is updated to {}", 
containerData.getContainerID(), dataChecksum);
+    containerData.setDataChecksum(dataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTree merkleTree = new ContainerMerkleTree();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    return merkleTree.toProto().getDataChecksum();
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  long scmBlockSize, XceiverClientSpi 
xceiverClient,
+                                  ContainerProtos.BlockMerkleTree 
missingBlock) throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, scmBlockSize);
+    // TODO: Cache the blockResponse to reuse it again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    // TODO: Add BcsId in BlockMerkleTree to avoid this call
+    ContainerProtos.GetCommittedBlockLengthResponseProto blockLengthResponse =
+        ContainerProtocolCalls.getCommittedBlockLength(xceiverClient, blockID, 
blockToken);
+    List<ContainerProtos.ChunkInfo> chunksList = 
blockResponse.getBlockData().getChunksList();
+
+    for (ContainerProtos.ChunkInfo chunkInfoProto : chunksList) {
+      ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+      ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+      ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+      chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+      writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, container);
+    }
+
+    putBlockForClosedContainer(chunksList, container, 
BlockData.getFromProtoBuf(blockResponse.getBlockData()),
+        blockLengthResponse.getBlockLength());
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+
+  private void reconcileChunk(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                              long scmBlockSize, XceiverClientSpi 
xceiverClient,
+                              Map.Entry<Long, 
List<ContainerProtos.ChunkMerkleTree>> mapEntry) throws IOException {

Review Comment:
   It will be cleaner to split the map entry into two separate parameters. This 
method does not need to know that the info came from map in the caller.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java:
##########
@@ -252,17 +272,155 @@ public void testGetChecksumInfoSuccess() throws 
Exception {
     }
   }
 
-  private long writeDataAndGetContainer(boolean close) throws Exception {
+  @Test
+  public void testContainerChecksumWithBlockMissing() throws Exception {
+    // 1. Write data to a container.
+    long containerID = writeDataAndGetContainer(true, 20 * 1024 * 1024);
+    Set<DatanodeDetails> peerNodes = cluster.getHddsDatanodes().stream().map(
+        HddsDatanodeService::getDatanodeDetails).collect(Collectors.toSet());
+    HddsDatanodeService hddsDatanodeService = 
cluster.getHddsDatanodes().get(0);
+    DatanodeStateMachine datanodeStateMachine = 
hddsDatanodeService.getDatanodeStateMachine();
+    Container<?> container = 
datanodeStateMachine.getContainer().getContainerSet().getContainer(containerID);
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    ContainerProtos.ContainerChecksumInfo oldContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    KeyValueHandler kvHandler = (KeyValueHandler) 
datanodeStateMachine.getContainer().getDispatcher()
+        .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
+
+    BlockManager blockManager = kvHandler.getBlockManager();
+    List<BlockData> blockDatas = blockManager.listBlock(container, -1, 100);
+    List<BlockData> deletedBlocks = new ArrayList<>();
+    String chunksPath = container.getContainerData().getChunksPath();
+    long oldDataChecksum = 
oldContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+    // 2. Delete some blocks to simulate missing blocks.
+    try (DBHandle db = BlockUtils.getDB(containerData, conf);
+         BatchOperation op = 
db.getStore().getBatchHandler().initBatchOperation()) {
+      for (int i = 0; i < blockDatas.size(); i += 2) {
+        BlockData blockData = blockDatas.get(i);
+        // Delete the block metadata from the container db
+        db.getStore().getBlockDataTable().deleteWithBatch(op, 
containerData.getBlockKey(blockData.getLocalID()));
+        // Delete the block file.
+        Files.deleteIfExists(Paths.get(chunksPath + "/" + 
blockData.getBlockID().getLocalID() + ".block"));
+        deletedBlocks.add(blockData);
+      }
+      db.getStore().getBatchHandler().commitBatchOperation(op);
+      db.getStore().flushDB();
+    }
+
+    
Files.deleteIfExists(getContainerChecksumFile(container.getContainerData()).toPath());
+    kvHandler.createContainerMerkleTree(container);
+    ContainerProtos.ContainerChecksumInfo containerChecksumAfterBlockDelete =
+        readChecksumFile(container.getContainerData());
+    long dataChecksumAfterBlockDelete = 
containerChecksumAfterBlockDelete.getContainerMerkleTree().getDataChecksum();
+    // Checksum should have changed after block delete.
+    Assertions.assertNotEquals(oldDataChecksum, dataChecksumAfterBlockDelete);
+
+    // 3. Reconcile the container.
+    
kvHandler.reconcileContainer(datanodeStateMachine.getDnContainerOperationClientClient(),
 container, peerNodes);
+    ContainerProtos.ContainerChecksumInfo newContainerChecksumInfo = 
readChecksumFile(container.getContainerData());
+    long newDataChecksum = 
newContainerChecksumInfo.getContainerMerkleTree().getDataChecksum();
+    
assertTreesSortedAndMatch(oldContainerChecksumInfo.getContainerMerkleTree(),
+        newContainerChecksumInfo.getContainerMerkleTree());
+    Assertions.assertEquals(oldDataChecksum, newDataChecksum);

Review Comment:
   Since this is an integration test we should also test reading the reconciled 
containers with clients and checking the checksum info SCM is seeing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to