aswinshakil commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r1929307711
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1427,21 +1436,136 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+
+ for (DatanodeDetails peer : peers) {
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Checksum not yet generated for peer: {}", peer);
+ return;
+ }
+
+ long scmBlockSize = (long) conf.getStorageSize(OZONE_SCM_BLOCK_SIZE,
OZONE_SCM_BLOCK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ ContainerDiffReport diffReport = checksumManager.diff(containerData,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
scmBlockSize, xceiverClient, missingBlock);
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
scmBlockSize, xceiverClient, entry);
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
scmBlockSize, xceiverClient, entry);
+ }
+ updateContainerChecksum(containerData);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ long dataChecksum = updateContainerChecksum(containerData);
+ LOG.info("Checksum data for container {} is updated to {}",
containerData.getContainerID(), dataChecksum);
+ containerData.setDataChecksum(dataChecksum);
sendICR(container);
}
+ private long updateContainerChecksum(KeyValueContainerData containerData)
throws IOException {
+ ContainerMerkleTree merkleTree = new ContainerMerkleTree();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ checksumManager.writeContainerDataTree(containerData, merkleTree);
+ return merkleTree.toProto().getDataChecksum();
+ }
+
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ long scmBlockSize, XceiverClientSpi
xceiverClient,
+ ContainerProtos.BlockMerkleTree
missingBlock) throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, scmBlockSize);
+ // TODO: Cache the blockResponse to reuse it again.
Review Comment:
Right now make multiple calls to get the block list. For missing blocks, it
is okay we need to get the block list anyway. We will be getting the block list
multiple times for missing and corrupt chunks for the same block. The block
list is going to be the same regardless. It would add up to unnecessary network
overhead. Instead, we should cache the `blockResponse` for each block to avoid
it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]