kerneltime commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r1964613184


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java:
##########
@@ -145,6 +147,16 @@ public static void logRecovered(ContainerData 
containerData) {
     LOG.info(getMessage(containerData));
   }
 
+  /**
+   * Logged when a container is reconciled.
+   *
+   * @param containerData The container that was reconciled on this datanode.
+   * @param oldDataChecksum The old data checksum.
+   */
+  public static void logReconciled(ContainerData containerData, long 
oldDataChecksum) {

Review Comment:
   It would be a good idea to include the new checksum. Since reconciliation is 
best effort, there is no guarantee that the new checksum will be the expected 
checksum.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container, 
boolean force)
     deleteInternal(container, force);
   }
 
+  // Update Java Doc steps
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+
+    if (checksumInfo.isPresent()) {
+      oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      createContainerMerkleTree(container);
+      checksumInfo = checksumManager.read(containerData);
+      if (checksumInfo.isPresent()) {
+        oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      }
+    }
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      // Check block token usage. How it is used in DN
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    // Update checksum based on RocksDB metadata
+    long dataChecksum = updateContainerChecksum(containerData);
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
+    if (dataChecksum == oldDataChecksum) {
+      metrics.incContainerReconciledWithoutChanges();
+      LOG.info("Container {} reconciled without changes, Current checksum {}", 
containerData.getContainerID(),
+              checksumToString(dataChecksum));
+    } else {
+      metrics.incContainerReconciledWithChanges();
+      LOG.warn("Container {} reconciled, Checksum updated from {} to {}", 
containerData.getContainerID(),
+              checksumToString(oldDataChecksum), 
checksumToString(dataChecksum));
+    }
+    ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    long dataChecksum = merkleTree.toProto().getDataChecksum();
+    containerData.setDataChecksum(dataChecksum);
+    return dataChecksum;
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  XceiverClientSpi xceiverClient, 
ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    // TODO: Re-use the blockResponse for the same block again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    long bcsId = getBlockManager().blockExists(container, blockID) ?
+        getBlockManager().getBlock(container, 
blockID).getBlockCommitSequenceId() : 0;
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+    List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+    List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    // Don't update bcsId if chunk read fails
+    for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+      try {
+        ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+        ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+        chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+        writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+        successfullChunksList.add(chunkInfoProto);
+      } catch (IOException ex) {
+        overwriteBcsId = false;
+        LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+                blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+      }
+    }
+
+    BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+    putBlockData.setChunks(successfullChunksList);
+    putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+
+  private void reconcileChunk(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                              XceiverClientSpi xceiverClient, long blockId,
+                              List<ContainerProtos.ChunkMerkleTree> chunkList) 
throws IOException {
+    Set<Long> offsets = 
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+        .collect(Collectors.toSet());
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);

Review Comment:
   Add a comment as to why the length is being set to zero. Currently, the 
length of the block is not known at this stage and instead of using a default 
value (which can change over the life of a cluster), we are setting it to zero 
as it does not play a role currently in validation for block reads.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container, 
boolean force)
     deleteInternal(container, force);
   }
 
+  // Update Java Doc steps
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+
+    if (checksumInfo.isPresent()) {
+      oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      createContainerMerkleTree(container);
+      checksumInfo = checksumManager.read(containerData);
+      if (checksumInfo.isPresent()) {
+        oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      }
+    }
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      // Check block token usage. How it is used in DN
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    // Update checksum based on RocksDB metadata
+    long dataChecksum = updateContainerChecksum(containerData);
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
+    if (dataChecksum == oldDataChecksum) {
+      metrics.incContainerReconciledWithoutChanges();
+      LOG.info("Container {} reconciled without changes, Current checksum {}", 
containerData.getContainerID(),
+              checksumToString(dataChecksum));
+    } else {
+      metrics.incContainerReconciledWithChanges();
+      LOG.warn("Container {} reconciled, Checksum updated from {} to {}", 
containerData.getContainerID(),
+              checksumToString(oldDataChecksum), 
checksumToString(dataChecksum));
+    }
+    ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    long dataChecksum = merkleTree.toProto().getDataChecksum();
+    containerData.setDataChecksum(dataChecksum);
+    return dataChecksum;
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  XceiverClientSpi xceiverClient, 
ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    // TODO: Re-use the blockResponse for the same block again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    long bcsId = getBlockManager().blockExists(container, blockID) ?
+        getBlockManager().getBlock(container, 
blockID).getBlockCommitSequenceId() : 0;
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+    List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+    List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    // Don't update bcsId if chunk read fails
+    for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+      try {
+        ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+        ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+        chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+        writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+        successfullChunksList.add(chunkInfoProto);
+      } catch (IOException ex) {
+        overwriteBcsId = false;
+        LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+                blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+      }
+    }
+
+    BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+    putBlockData.setChunks(successfullChunksList);
+    putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+
+  private void reconcileChunk(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,

Review Comment:
   Since this method reconciles all chunks
   ```suggestion
     private void reconcileChunks(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
   ```



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container, 
boolean force)
     deleteInternal(container, force);
   }
 
+  // Update Java Doc steps
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+
+    if (checksumInfo.isPresent()) {
+      oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      createContainerMerkleTree(container);
+      checksumInfo = checksumManager.read(containerData);
+      if (checksumInfo.isPresent()) {
+        oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      }
+    }
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      // Check block token usage. How it is used in DN
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    // Update checksum based on RocksDB metadata
+    long dataChecksum = updateContainerChecksum(containerData);
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
+    if (dataChecksum == oldDataChecksum) {
+      metrics.incContainerReconciledWithoutChanges();
+      LOG.info("Container {} reconciled without changes, Current checksum {}", 
containerData.getContainerID(),
+              checksumToString(dataChecksum));
+    } else {
+      metrics.incContainerReconciledWithChanges();
+      LOG.warn("Container {} reconciled, Checksum updated from {} to {}", 
containerData.getContainerID(),
+              checksumToString(oldDataChecksum), 
checksumToString(dataChecksum));
+    }
+    ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    long dataChecksum = merkleTree.toProto().getDataChecksum();
+    containerData.setDataChecksum(dataChecksum);
+    return dataChecksum;
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  XceiverClientSpi xceiverClient, 
ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    // TODO: Re-use the blockResponse for the same block again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    long bcsId = getBlockManager().blockExists(container, blockID) ?
+        getBlockManager().getBlock(container, 
blockID).getBlockCommitSequenceId() : 0;
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+    List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+    List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    // Don't update bcsId if chunk read fails
+    for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+      try {
+        ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+        ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+        chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+        writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+        successfullChunksList.add(chunkInfoProto);
+      } catch (IOException ex) {
+        overwriteBcsId = false;
+        LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+                blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+      }
+    }
+
+    BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+    putBlockData.setChunks(successfullChunksList);
+    putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+

Review Comment:
   This is an important piece of logic (reconcile block as well) and needs a 
good Java doc explaining the intent and algorithm expected. 



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container, 
boolean force)
     deleteInternal(container, force);
   }
 
+  // Update Java Doc steps
   @Override
   public void reconcileContainer(DNContainerOperationClient dnClient, 
Container<?> container,
                                  Set<DatanodeDetails> peers) throws 
IOException {
-    // TODO Just a deterministic placeholder hash for testing until actual 
implementation is finished.
-    ContainerData data = container.getContainerData();
-    long id = data.getContainerID();
-    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
-        .putLong(id)
-        .asReadOnlyBuffer();
-    byteBuffer.rewind();
-    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
-    checksumImpl.update(byteBuffer);
-    long dataChecksum = checksumImpl.getValue();
-    LOG.info("Generated data checksum of container {} for testing: {}", id, 
dataChecksum);
-    data.setDataChecksum(dataChecksum);
+    KeyValueContainer kvContainer = (KeyValueContainer) container;
+    KeyValueContainerData containerData = (KeyValueContainerData) 
container.getContainerData();
+    Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo = 
checksumManager.read(containerData);
+    long oldDataChecksum = 0;
+
+    if (checksumInfo.isPresent()) {
+      oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+    } else {
+      // Try creating the checksum info from RocksDB metadata if it is not 
present.
+      createContainerMerkleTree(container);
+      checksumInfo = checksumManager.read(containerData);
+      if (checksumInfo.isPresent()) {
+        oldDataChecksum = 
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+      }
+    }
+
+    for (DatanodeDetails peer : peers) {
+      ContainerProtos.ContainerChecksumInfo peerChecksumInfo = 
dnClient.getContainerChecksumInfo(
+          containerData.getContainerID(), peer);
+      if (peerChecksumInfo == null) {
+        LOG.warn("Cannot reconcile container {} with peer {} which has not yet 
generated a checksum",
+            containerData.getContainerID(), peer);
+        continue;
+      }
+
+      // Check block token usage. How it is used in DN
+      ContainerDiffReport diffReport = checksumManager.diff(containerData, 
peerChecksumInfo);
+      TokenHelper tokenHelper = dnClient.getTokenHelper();
+      XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+          .acquireClient(createSingleNodePipeline(peer));
+
+      try {
+        // Handle missing blocks
+        for (ContainerProtos.BlockMerkleTree missingBlock : 
diffReport.getMissingBlocks()) {
+          try {
+            handleMissingBlock(kvContainer, containerData, tokenHelper, 
xceiverClient, missingBlock);
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing block for block {} in 
container {}", missingBlock.getBlockID(),
+                containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle missing chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getMissingChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling missing chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+
+        // Handle corrupt chunks
+        for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry : 
diffReport.getCorruptChunks().entrySet()) {
+          try {
+            reconcileChunk(kvContainer, containerData, tokenHelper, 
xceiverClient, entry.getKey(), entry.getValue());
+          } catch (IOException e) {
+            LOG.error("Error while reconciling corrupt chunk for block {} in 
container {}", entry.getKey(),
+                    containerData.getContainerID(), e);
+          }
+        }
+        updateContainerChecksum(containerData);
+      } finally {
+        dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+      }
+    }
+
+    // Update checksum based on RocksDB metadata
+    long dataChecksum = updateContainerChecksum(containerData);
+    // Trigger manual on demand scanner
+    OnDemandContainerDataScanner.scanContainer(container);
+    if (dataChecksum == oldDataChecksum) {
+      metrics.incContainerReconciledWithoutChanges();
+      LOG.info("Container {} reconciled without changes, Current checksum {}", 
containerData.getContainerID(),
+              checksumToString(dataChecksum));
+    } else {
+      metrics.incContainerReconciledWithChanges();
+      LOG.warn("Container {} reconciled, Checksum updated from {} to {}", 
containerData.getContainerID(),
+              checksumToString(oldDataChecksum), 
checksumToString(dataChecksum));
+    }
+    ContainerLogger.logReconciled(container.getContainerData(), 
oldDataChecksum);
     sendICR(container);
   }
 
+  private long updateContainerChecksum(KeyValueContainerData containerData) 
throws IOException {
+    ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+    try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+         BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+             getBlockIterator(containerData.getContainerID())) {
+      while (blockIterator.hasNext()) {
+        BlockData blockData = blockIterator.nextBlock();
+        List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+        merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+      }
+    }
+    checksumManager.writeContainerDataTree(containerData, merkleTree);
+    long dataChecksum = merkleTree.toProto().getDataChecksum();
+    containerData.setDataChecksum(dataChecksum);
+    return dataChecksum;
+  }
+
+  private void handleMissingBlock(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                                  XceiverClientSpi xceiverClient, 
ContainerProtos.BlockMerkleTree missingBlock)
+          throws IOException {
+    BlockID blockID = new BlockID(containerData.getContainerID(), 
missingBlock.getBlockID());
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    // TODO: Re-use the blockResponse for the same block again.
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    long bcsId = getBlockManager().blockExists(container, blockID) ?
+        getBlockManager().getBlock(container, 
blockID).getBlockCommitSequenceId() : 0;
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+    List<ContainerProtos.ChunkInfo> peerChunksList = 
peerBlockData.getChunksList();
+    List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+    // Update BcsId only if all chunks are successfully written.
+    boolean overwriteBcsId = true;
+
+    // Don't update bcsId if chunk read fails
+    for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+      try {
+        ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto, 
blockID, blockToken);
+        ChunkBuffer chunkBuffer = 
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+        ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+        chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+        writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer, 
container);
+        successfullChunksList.add(chunkInfoProto);
+      } catch (IOException ex) {
+        overwriteBcsId = false;
+        LOG.error("Error while reconciling missing block {} for offset {} in 
container {}",
+                blockID, chunkInfoProto.getOffset(), 
containerData.getContainerID(), ex);
+      }
+    }
+
+    BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+    putBlockData.setChunks(successfullChunksList);
+    putBlockForClosedContainer(container, putBlockData, maxBcsId, 
overwriteBcsId);
+  }
+
+  private ByteString readChunkData(XceiverClientSpi xceiverClient, 
ContainerProtos.ChunkInfo chunkInfoProto,
+                                   BlockID blockID, 
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+    ContainerProtos.ReadChunkResponseProto response =
+        ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto, 
blockID.getDatanodeBlockIDProtobuf(),
+            null, blockToken);
+
+    if (response.hasData()) {
+      return response.getData();
+    } else if (response.hasDataBuffers()) {
+      return 
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+    } else {
+      throw new IOException("Error reading chunk data: No data returned.");
+    }
+  }
+
+  private void reconcileChunk(KeyValueContainer container, ContainerData 
containerData, TokenHelper tokenHelper,
+                              XceiverClientSpi xceiverClient, long blockId,
+                              List<ContainerProtos.ChunkMerkleTree> chunkList) 
throws IOException {
+    Set<Long> offsets = 
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+        .collect(Collectors.toSet());
+    BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+    Token<OzoneBlockTokenIdentifier> blockToken = 
tokenHelper.getBlockToken(blockID, 0L);
+    ContainerProtos.GetBlockResponseProto blockResponse = 
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+        blockToken, new HashMap<>());
+    ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+    BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+    // Check the local bcsId with the one from the bcsId from the peer 
datanode.
+    long maxBcsId = 
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+            localBlockData.getBlockCommitSequenceId());
+    List<ContainerProtos.ChunkInfo> chunksListFromPeer = 
peerBlockData.getChunksList();
+
+    SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap = 
localBlockData.getChunks().stream()
+            .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+                Function.identity(), (chunk1, chunk2) -> chunk1, 
TreeMap::new));
+    boolean overwriteBcsId = true;
+
+    for (ContainerProtos.ChunkInfo chunkInfoProto : chunksListFromPeer) {
+      try {
+        if (!offsets.contains(chunkInfoProto.getOffset())) {
+          continue;

Review Comment:
   If the remote block has an offset that this block does not have, should we 
include it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to