kerneltime commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r1964613184
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java:
##########
@@ -145,6 +147,16 @@ public static void logRecovered(ContainerData
containerData) {
LOG.info(getMessage(containerData));
}
+ /**
+ * Logged when a container is reconciled.
+ *
+ * @param containerData The container that was reconciled on this datanode.
+ * @param oldDataChecksum The old data checksum.
+ */
+ public static void logReconciled(ContainerData containerData, long
oldDataChecksum) {
Review Comment:
It would be a good idea to include the new checksum. Since reconciliation is
best effort, there is no guarantee that the new checksum will be the expected
checksum.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container,
boolean force)
deleteInternal(container, force);
}
+ // Update Java Doc steps
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ createContainerMerkleTree(container);
+ checksumInfo = checksumManager.read(containerData);
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ }
+ }
+
+ for (DatanodeDetails peer : peers) {
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ // Check block token usage. How it is used in DN
+ ContainerDiffReport diffReport = checksumManager.diff(containerData,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ updateContainerChecksum(containerData);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Update checksum based on RocksDB metadata
+ long dataChecksum = updateContainerChecksum(containerData);
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum {}",
containerData.getContainerID(),
+ checksumToString(dataChecksum));
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}",
containerData.getContainerID(),
+ checksumToString(oldDataChecksum),
checksumToString(dataChecksum));
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum);
sendICR(container);
}
+ private long updateContainerChecksum(KeyValueContainerData containerData)
throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ checksumManager.writeContainerDataTree(containerData, merkleTree);
+ long dataChecksum = merkleTree.toProto().getDataChecksum();
+ containerData.setDataChecksum(dataChecksum);
+ return dataChecksum;
+ }
+
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
+
+ if (response.hasData()) {
+ return response.getData();
+ } else if (response.hasDataBuffers()) {
+ return
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+ } else {
+ throw new IOException("Error reading chunk data: No data returned.");
+ }
+ }
+
+ private void reconcileChunk(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient, long blockId,
+ List<ContainerProtos.ChunkMerkleTree> chunkList)
throws IOException {
+ Set<Long> offsets =
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+ .collect(Collectors.toSet());
+ BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
Review Comment:
Add a comment as to why the length is being set to zero. Currently, the
length of the block is not known at this stage and instead of using a default
value (which can change over the life of a cluster), we are setting it to zero
as it does not play a role currently in validation for block reads.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container,
boolean force)
deleteInternal(container, force);
}
+ // Update Java Doc steps
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ createContainerMerkleTree(container);
+ checksumInfo = checksumManager.read(containerData);
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ }
+ }
+
+ for (DatanodeDetails peer : peers) {
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ // Check block token usage. How it is used in DN
+ ContainerDiffReport diffReport = checksumManager.diff(containerData,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ updateContainerChecksum(containerData);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Update checksum based on RocksDB metadata
+ long dataChecksum = updateContainerChecksum(containerData);
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum {}",
containerData.getContainerID(),
+ checksumToString(dataChecksum));
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}",
containerData.getContainerID(),
+ checksumToString(oldDataChecksum),
checksumToString(dataChecksum));
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum);
sendICR(container);
}
+ private long updateContainerChecksum(KeyValueContainerData containerData)
throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ checksumManager.writeContainerDataTree(containerData, merkleTree);
+ long dataChecksum = merkleTree.toProto().getDataChecksum();
+ containerData.setDataChecksum(dataChecksum);
+ return dataChecksum;
+ }
+
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
+
+ if (response.hasData()) {
+ return response.getData();
+ } else if (response.hasDataBuffers()) {
+ return
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+ } else {
+ throw new IOException("Error reading chunk data: No data returned.");
+ }
+ }
+
+ private void reconcileChunk(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
Review Comment:
Since this method reconciles all chunks
```suggestion
private void reconcileChunks(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container,
boolean force)
deleteInternal(container, force);
}
+ // Update Java Doc steps
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ createContainerMerkleTree(container);
+ checksumInfo = checksumManager.read(containerData);
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ }
+ }
+
+ for (DatanodeDetails peer : peers) {
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ // Check block token usage. How it is used in DN
+ ContainerDiffReport diffReport = checksumManager.diff(containerData,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ updateContainerChecksum(containerData);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Update checksum based on RocksDB metadata
+ long dataChecksum = updateContainerChecksum(containerData);
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum {}",
containerData.getContainerID(),
+ checksumToString(dataChecksum));
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}",
containerData.getContainerID(),
+ checksumToString(oldDataChecksum),
checksumToString(dataChecksum));
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum);
sendICR(container);
}
+ private long updateContainerChecksum(KeyValueContainerData containerData)
throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ checksumManager.writeContainerDataTree(containerData, merkleTree);
+ long dataChecksum = merkleTree.toProto().getDataChecksum();
+ containerData.setDataChecksum(dataChecksum);
+ return dataChecksum;
+ }
+
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
+
+ if (response.hasData()) {
+ return response.getData();
+ } else if (response.hasDataBuffers()) {
+ return
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+ } else {
+ throw new IOException("Error reading chunk data: No data returned.");
+ }
+ }
+
Review Comment:
This is an important piece of logic (reconcile block as well) and needs a
good Java doc explaining the intent and algorithm expected.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1473,24 +1476,210 @@ public void deleteContainer(Container container,
boolean force)
deleteInternal(container, force);
}
+ // Update Java Doc steps
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> checksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ createContainerMerkleTree(container);
+ checksumInfo = checksumManager.read(containerData);
+ if (checksumInfo.isPresent()) {
+ oldDataChecksum =
checksumInfo.get().getContainerMerkleTree().getDataChecksum();
+ }
+ }
+
+ for (DatanodeDetails peer : peers) {
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ // Check block token usage. How it is used in DN
+ ContainerDiffReport diffReport = checksumManager.diff(containerData,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunk(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(), entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ updateContainerChecksum(containerData);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Update checksum based on RocksDB metadata
+ long dataChecksum = updateContainerChecksum(containerData);
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum {}",
containerData.getContainerID(),
+ checksumToString(dataChecksum));
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}",
containerData.getContainerID(),
+ checksumToString(oldDataChecksum),
checksumToString(dataChecksum));
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum);
sendICR(container);
}
+ private long updateContainerChecksum(KeyValueContainerData containerData)
throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ checksumManager.writeContainerDataTree(containerData, merkleTree);
+ long dataChecksum = merkleTree.toProto().getDataChecksum();
+ containerData.setDataChecksum(dataChecksum);
+ return dataChecksum;
+ }
+
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
+
+ if (response.hasData()) {
+ return response.getData();
+ } else if (response.hasDataBuffers()) {
+ return
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+ } else {
+ throw new IOException("Error reading chunk data: No data returned.");
+ }
+ }
+
+ private void reconcileChunk(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient, long blockId,
+ List<ContainerProtos.ChunkMerkleTree> chunkList)
throws IOException {
+ Set<Long> offsets =
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+ .collect(Collectors.toSet());
+ BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+ localBlockData.getBlockCommitSequenceId());
+ List<ContainerProtos.ChunkInfo> chunksListFromPeer =
peerBlockData.getChunksList();
+
+ SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap =
localBlockData.getChunks().stream()
+ .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+ Function.identity(), (chunk1, chunk2) -> chunk1,
TreeMap::new));
+ boolean overwriteBcsId = true;
+
+ for (ContainerProtos.ChunkInfo chunkInfoProto : chunksListFromPeer) {
+ try {
+ if (!offsets.contains(chunkInfoProto.getOffset())) {
+ continue;
Review Comment:
If the remote block has an offset that this block does not have, should we
include it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]