errose28 commented on code in PR #7474:
URL: https://github.com/apache/ozone/pull/7474#discussion_r1991919728
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java:
##########
@@ -82,7 +82,9 @@ public void stop() {
* file remains unchanged.
* Concurrent writes to the same file are coordinated internally.
*/
- public void writeContainerDataTree(ContainerData data,
ContainerMerkleTreeWriter tree) throws IOException {
+ public ContainerProtos.ContainerChecksumInfo
writeContainerDataTree(ContainerData data,
+
ContainerMerkleTreeWriter tree)
+ throws IOException {
Review Comment:
nit. whitespace diff
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java:
##########
@@ -752,4 +753,9 @@ public DatanodeQueueMetrics getQueueMetrics() {
public ReconfigurationHandler getReconfigurationHandler() {
return reconfigurationHandler;
}
+
+ @VisibleForTesting
+ public DNContainerOperationClient getDnContainerOperationClientClient() {
Review Comment:
This method is not used anymore. Probably no changes are required to this
file now.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -600,28 +615,19 @@ ContainerCommandResponseProto handleCloseContainer(
return getSuccessResponse(request);
}
- private void createContainerMerkleTree(Container container) {
+ public ContainerProtos.ContainerChecksumInfo
createContainerMerkleTree(Container container) {
Review Comment:
Let's use `Optional` here to avoid surprises. Currently only one caller
actually saves the return type.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
Review Comment:
This `null` is bypassing the read checksum verification. We should use
[ChunkInputStream](https://github.com/apache/ozone/blob/10870bb851e3d2ed0024659f83af164f17a2f491/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java#L440)
instead of making our own method here.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerLogger.java:
##########
@@ -145,6 +148,23 @@ public static void logRecovered(ContainerData
containerData) {
LOG.info(getMessage(containerData));
}
+ /**
+ * Logged when a container is reconciled.
+ *
+ * @param containerData The container that was reconciled on this datanode.
+ * @param oldDataChecksum The old data checksum.
+ */
+ public static void logReconciled(ContainerData containerData, long
oldDataChecksum, DatanodeDetails peer) {
+ if (containerData.getDataChecksum() == oldDataChecksum) {
+ LOG.info(getMessage(containerData, "Container reconciled with peer " +
peer.toString() +
+ ". No change in checksum. Current checksum is " +
checksumToString(containerData.getDataChecksum())));
Review Comment:
`getMessage` will already add the current data checksum to every message. We
don't need to log it again here.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
Review Comment:
For hard failures like this let's propagate the original exception. The
`ReconcileContainerTask` will log it.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
Review Comment:
```suggestion
LOG.warn("Container {} reconciled. Checksum updated from {} to {}.
Time taken {} ms",
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
Review Comment:
Don't we need to exit this loop on the first failure, since future chunk
writes will fail if there is a hole before them?
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
Review Comment:
Let's expand this comment. We are expecting that the checksums of the new
data have been validated on our side by readChunk, so we are not doing a full
scan here.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
Review Comment:
This looks more like an old todo message. Let's add actual javadoc here.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
Review Comment:
Should we add a metric for total repair time between one pair of peers?
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
Review Comment:
Let's handle this TODO now or file a follow up Jira to track it and put the
Jira ID in the code. Otherwise if will likely never get done.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
Review Comment:
This and the log below it should also contain peer info, like the container
log has.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
Review Comment:
```suggestion
// Trigger an on-demand scan of the container to check our work after
reconciling with all peers.
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
Review Comment:
Since we think we are missing the block, this case would only happen if the
merkle tree had stale information saying we didn't have a block and now we do,
correct? In that case I think we should just abort this method.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
Review Comment:
```suggestion
LOG.info("Container {} reconciled without changes. Current
checksum {}. Time taken {} ms",
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
+
+ if (response.hasData()) {
+ return response.getData();
+ } else if (response.hasDataBuffers()) {
+ return
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+ } else {
+ throw new IOException("Error reading chunk data: No data returned.");
+ }
+ }
+
+ /**
+ * This method reconciles chunks per block. It reads the missing/corrupt
chunk data from the peer
+ * datanode and writes it to the local container. If the chunk write fails,
the block commit sequence
+ * id is not updated.
+ */
+ private void reconcileChunksPerBlock(KeyValueContainer container,
ContainerData containerData,
Review Comment:
We can get the ContainerData from the KeyValueContainer, no need for two
parameters.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
Review Comment:
nit. spelling
```suggestion
List<ContainerProtos.ChunkInfo> successfulChunksList = new ArrayList<>();
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1483,21 +1489,249 @@ public void deleteContainer(Container container,
boolean force)
@Override
public void reconcileContainer(DNContainerOperationClient dnClient,
Container<?> container,
Set<DatanodeDetails> peers) throws
IOException {
- // TODO Just a deterministic placeholder hash for testing until actual
implementation is finished.
- ContainerData data = container.getContainerData();
- long id = data.getContainerID();
- ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES)
- .putLong(id)
- .asReadOnlyBuffer();
- byteBuffer.rewind();
- ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32CImpl();
- checksumImpl.update(byteBuffer);
- long dataChecksum = checksumImpl.getValue();
- LOG.info("Generated data checksum of container {} for testing: {}", id,
dataChecksum);
- data.setDataChecksum(dataChecksum);
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ Optional<ContainerProtos.ContainerChecksumInfo> optionalChecksumInfo =
checksumManager.read(containerData);
+ long oldDataChecksum = 0;
+ long dataChecksum = 0;
+ ContainerProtos.ContainerChecksumInfo checksumInfo;
+
+ if (optionalChecksumInfo.isPresent()) {
+ checksumInfo = optionalChecksumInfo.get();
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ } else {
+ // Try creating the checksum info from RocksDB metadata if it is not
present.
+ checksumInfo = createContainerMerkleTree(container);
+ if (checksumInfo == null) {
+ LOG.error("Failed to reconcile container {} as checksum info is not
available",
+ containerData.getContainerID());
+ return;
+ }
+ oldDataChecksum =
checksumInfo.getContainerMerkleTree().getDataChecksum();
+ }
+
+ for (DatanodeDetails peer : peers) {
+ Instant start = Instant.now();
+ ContainerProtos.ContainerChecksumInfo peerChecksumInfo =
dnClient.getContainerChecksumInfo(
+ containerData.getContainerID(), peer);
+ if (peerChecksumInfo == null) {
+ LOG.warn("Cannot reconcile container {} with peer {} which has not yet
generated a checksum",
+ containerData.getContainerID(), peer);
+ continue;
+ }
+
+ ContainerDiffReport diffReport = checksumManager.diff(checksumInfo,
peerChecksumInfo);
+ TokenHelper tokenHelper = dnClient.getTokenHelper();
+ XceiverClientSpi xceiverClient = dnClient.getXceiverClientManager()
+ .acquireClient(createSingleNodePipeline(peer));
+
+ try {
+ // Handle missing blocks
+ for (ContainerProtos.BlockMerkleTree missingBlock :
diffReport.getMissingBlocks()) {
+ try {
+ handleMissingBlock(kvContainer, containerData, tokenHelper,
xceiverClient, missingBlock);
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing block for block {} in
container {}", missingBlock.getBlockID(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle missing chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getMissingChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling missing chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+
+ // Handle corrupt chunks
+ for (Map.Entry<Long, List<ContainerProtos.ChunkMerkleTree>> entry :
diffReport.getCorruptChunks().entrySet()) {
+ try {
+ reconcileChunksPerBlock(kvContainer, containerData, tokenHelper,
xceiverClient, entry.getKey(),
+ entry.getValue());
+ } catch (IOException e) {
+ LOG.error("Error while reconciling corrupt chunk for block {} in
container {}", entry.getKey(),
+ containerData.getContainerID(), e);
+ }
+ }
+ // Update checksum based on RocksDB metadata
+ ContainerProtos.ContainerChecksumInfo updatedChecksumInfo =
updateContainerChecksum(containerData);
+ dataChecksum =
updatedChecksumInfo.getContainerMerkleTree().getDataChecksum();
+
+ long duration = Duration.between(start, Instant.now()).toMillis();
+ if (dataChecksum == oldDataChecksum) {
+ metrics.incContainerReconciledWithoutChanges();
+ LOG.info("Container {} reconciled without changes, Current checksum
{}. Time taken {} ms",
+ containerData.getContainerID(), checksumToString(dataChecksum),
duration);
+ } else {
+ metrics.incContainerReconciledWithChanges();
+ LOG.warn("Container {} reconciled, Checksum updated from {} to {}.
Time taken {} ms",
+ containerData.getContainerID(),
checksumToString(oldDataChecksum),
+ checksumToString(dataChecksum), duration);
+ }
+ ContainerLogger.logReconciled(container.getContainerData(),
oldDataChecksum, peer);
+ } finally {
+ dnClient.getXceiverClientManager().releaseClient(xceiverClient, false);
+ }
+ }
+
+ // Trigger manual on demand scanner
+ OnDemandContainerDataScanner.scanContainer(container);
sendICR(container);
}
+ // Return the entire tree instead of just the checksum
+ private ContainerProtos.ContainerChecksumInfo
updateContainerChecksum(KeyValueContainerData containerData)
+ throws IOException {
+ ContainerMerkleTreeWriter merkleTree = new ContainerMerkleTreeWriter();
+ try (DBHandle dbHandle = BlockUtils.getDB(containerData, conf);
+ BlockIterator<BlockData> blockIterator = dbHandle.getStore().
+ getBlockIterator(containerData.getContainerID())) {
+ while (blockIterator.hasNext()) {
+ BlockData blockData = blockIterator.nextBlock();
+ List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+ merkleTree.addChunks(blockData.getLocalID(), chunkInfos);
+ }
+ }
+ ContainerProtos.ContainerChecksumInfo checksumInfo = checksumManager
+ .writeContainerDataTree(containerData, merkleTree);
+
containerData.setDataChecksum(checksumInfo.getContainerMerkleTree().getDataChecksum());
+ return checksumInfo;
+ }
+
+ /**
+ * Handle missing block. It reads the missing block data from the peer
datanode and writes it to the local container.
+ * If the block write fails, the block commit sequence id is not updated.
+ */
+ private void handleMissingBlock(KeyValueContainer container, ContainerData
containerData, TokenHelper tokenHelper,
+ XceiverClientSpi xceiverClient,
ContainerProtos.BlockMerkleTree missingBlock)
+ throws IOException {
+ BlockID blockID = new BlockID(containerData.getContainerID(),
missingBlock.getBlockID());
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ // TODO: Re-use the blockResponse for the same block again.
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ long bcsId = getBlockManager().blockExists(container, blockID) ?
+ getBlockManager().getBlock(container,
blockID).getBlockCommitSequenceId() : 0;
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(), bcsId);
+ List<ContainerProtos.ChunkInfo> peerChunksList =
peerBlockData.getChunksList();
+ List<ContainerProtos.ChunkInfo> successfullChunksList = new ArrayList<>();
+ // Update BcsId only if all chunks are successfully written.
+ boolean overwriteBcsId = true;
+
+ // Don't update bcsId if chunk read fails
+ for (ContainerProtos.ChunkInfo chunkInfoProto : peerChunksList) {
+ try {
+ ByteString chunkData = readChunkData(xceiverClient, chunkInfoProto,
blockID, blockToken);
+ ChunkBuffer chunkBuffer =
ChunkBuffer.wrap(chunkData.asReadOnlyByteBuffer());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
+ chunkInfo.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
+ writeChunkForClosedContainer(chunkInfo, blockID, chunkBuffer,
container);
+ successfullChunksList.add(chunkInfoProto);
+ } catch (IOException ex) {
+ overwriteBcsId = false;
+ LOG.error("Error while reconciling missing block {} for offset {} in
container {}",
+ blockID, chunkInfoProto.getOffset(),
containerData.getContainerID(), ex);
+ }
+ }
+
+ BlockData putBlockData = BlockData.getFromProtoBuf(peerBlockData);
+ putBlockData.setChunks(successfullChunksList);
+ putBlockForClosedContainer(container, putBlockData, maxBcsId,
overwriteBcsId);
+ }
+
+ private ByteString readChunkData(XceiverClientSpi xceiverClient,
ContainerProtos.ChunkInfo chunkInfoProto,
+ BlockID blockID,
Token<OzoneBlockTokenIdentifier> blockToken) throws IOException {
+ ContainerProtos.ReadChunkResponseProto response =
+ ContainerProtocolCalls.readChunk(xceiverClient, chunkInfoProto,
blockID.getDatanodeBlockIDProtobuf(),
+ null, blockToken);
+
+ if (response.hasData()) {
+ return response.getData();
+ } else if (response.hasDataBuffers()) {
+ return
BufferUtils.concatByteStrings(response.getDataBuffers().getBuffersList());
+ } else {
+ throw new IOException("Error reading chunk data: No data returned.");
+ }
+ }
+
+ /**
+ * This method reconciles chunks per block. It reads the missing/corrupt
chunk data from the peer
+ * datanode and writes it to the local container. If the chunk write fails,
the block commit sequence
+ * id is not updated.
+ */
+ private void reconcileChunksPerBlock(KeyValueContainer container,
ContainerData containerData,
+ TokenHelper tokenHelper,
XceiverClientSpi xceiverClient, long blockId,
+ List<ContainerProtos.ChunkMerkleTree>
chunkList) throws IOException {
+ Set<Long> offsets =
chunkList.stream().map(ContainerProtos.ChunkMerkleTree::getOffset)
+ .collect(Collectors.toSet());
+ BlockID blockID = new BlockID(containerData.getContainerID(), blockId);
+ // The length of the block is not known, so instead of passing the default
block length we pass 0. As the length
+ // is not used to validate the token for getBlock call.
+ Token<OzoneBlockTokenIdentifier> blockToken =
tokenHelper.getBlockToken(blockID, 0L);
+ ContainerProtos.GetBlockResponseProto blockResponse =
ContainerProtocolCalls.getBlock(xceiverClient, blockID,
+ blockToken, new HashMap<>());
+ ContainerProtos.BlockData peerBlockData = blockResponse.getBlockData();
+ BlockData localBlockData = getBlockManager().getBlock(container, blockID);
+ // Check the local bcsId with the one from the bcsId from the peer
datanode.
+ long maxBcsId =
Math.max(peerBlockData.getBlockID().getBlockCommitSequenceId(),
+ localBlockData.getBlockCommitSequenceId());
+ List<ContainerProtos.ChunkInfo> chunksListFromPeer =
peerBlockData.getChunksList();
+
+ SortedMap<Long, ContainerProtos.ChunkInfo> localChunksMap =
localBlockData.getChunks().stream()
+ .collect(Collectors.toMap(ContainerProtos.ChunkInfo::getOffset,
+ Function.identity(), (chunk1, chunk2) -> chunk1,
TreeMap::new));
+ boolean overwriteBcsId = true;
+
+ for (ContainerProtos.ChunkInfo chunkInfoProto : chunksListFromPeer) {
Review Comment:
This negates the benefit of the merkle tree. The loop is
`O(len(chunksListFromPeer))`. We want this iteration to be `O(len(chunkList))`,
where `chunkList` is the parameter defined in this method as only the chunks
that we are missing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]