This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8eca5b77c0 HDDS-10547. Fix shared buffer for datanode checksum
calculation (#6402)
8eca5b77c0 is described below
commit 8eca5b77c0ac56f30ab2b897af907b9d6ca09a20
Author: Cyrill <[email protected]>
AuthorDate: Mon Mar 25 22:08:33 2024 +0300
HDDS-10547. Fix shared buffer for datanode checksum calculation (#6402)
---
.../ozone/container/keyvalue/KeyValueHandler.java | 4 +-
.../container/common/impl/TestHddsDispatcher.java | 148 +++++++++++++++++++++
2 files changed, 150 insertions(+), 2 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 14532c550d..01bd4db8a1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -756,7 +756,7 @@ public class KeyValueHandler extends Handler {
throws StorageContainerException {
if (validateChunkChecksumData) {
try {
- Checksum.verifyChecksum(data, info.getChecksumData(), 0);
+ Checksum.verifyChecksum(data.duplicate(data.position(), data.limit()),
info.getChecksumData(), 0);
} catch (OzoneChecksumException ex) {
throw ChunkUtils.wrapInStorageContainerException(ex);
}
@@ -857,9 +857,9 @@ public class KeyValueHandler extends Handler {
// chunks will be committed as a part of handling putSmallFile
// here. There is no need to maintain this info in openContainerBlockMap.
+ validateChunkChecksumData(data, chunkInfo);
chunkManager
.writeChunk(kvContainer, blockID, chunkInfo, data,
dispatcherContext);
- validateChunkChecksumData(data, chunkInfo);
chunkManager.finishWriteChunks(kvContainer, blockData);
List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 1401520231..eaf901c67a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.BlockID;
@@ -40,6 +41,8 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
@@ -47,6 +50,7 @@ import
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
@@ -164,6 +168,72 @@ public class TestHddsDispatcher {
}
}
+ @Test
+ public void testSmallFileChecksum() throws IOException {
+ String testDirPath = testDir.getPath();
+ try {
+ UUID scmId = UUID.randomUUID();
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
+ conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
+ DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
+ dnConf.setChunkDataValidationCheck(true);
+ conf.setFromObject(dnConf);
+ DatanodeDetails dd = randomDatanodeDetails();
+ HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
+
+ ContainerCommandResponseProto smallFileResponse =
+ hddsDispatcher.dispatch(newPutSmallFile(1L, 1L), null);
+
+ assertEquals(ContainerProtos.Result.SUCCESS,
smallFileResponse.getResult());
+ } finally {
+ ContainerMetrics.remove();
+ }
+ }
+
+ @Test
+ public void testWriteChunkChecksum() throws IOException {
+ String testDirPath = testDir.getPath();
+ try {
+ UUID scmId = UUID.randomUUID();
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
+ conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
+ DatanodeConfiguration dnConf =
conf.getObject(DatanodeConfiguration.class);
+ dnConf.setChunkDataValidationCheck(true);
+ conf.setFromObject(dnConf);
+ DatanodeDetails dd = randomDatanodeDetails();
+ HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
+ //Send a few WriteChunkRequests
+ ContainerCommandResponseProto response;
+ ContainerCommandRequestProto writeChunkRequest0 =
getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 0);
+ hddsDispatcher.dispatch(writeChunkRequest0, null);
+ hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L,
1L, 1), null);
+ response =
hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 2),
null);
+
+ assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+ // Send Read Chunk request for written chunk.
+ response =
+ hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0),
null);
+ assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+
+ ByteString responseData = BufferUtils.concatByteStrings(
+ response.getReadChunk().getDataBuffers().getBuffersList());
+ assertEquals(writeChunkRequest0.getWriteChunk().getData(),
+ responseData);
+
+ // Test checksum on Read:
+ final DispatcherContext context = DispatcherContext
+ .newBuilder(DispatcherContext.Op.READ_STATE_MACHINE_DATA)
+ .build();
+ response =
+ hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0),
context);
+ assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+ } finally {
+ ContainerMetrics.remove();
+ }
+ }
+
@ContainerLayoutTestInfo.ContainerTest
public void testContainerCloseActionWhenVolumeFull(
ContainerLayoutVersion layoutVersion) throws Exception {
@@ -514,6 +584,84 @@ public class TestHddsDispatcher {
.build();
}
+ static ChecksumData checksum(ByteString data) {
+ try {
+ return new Checksum(ContainerProtos.ChecksumType.CRC32, 256)
+ .computeChecksum(data.asReadOnlyByteBuffer());
+ } catch (OzoneChecksumException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private ContainerCommandRequestProto getWriteChunkRequest0(
+ String datanodeId, Long containerId, Long localId, int chunkNum) {
+ final int lenOfBytes = 32;
+ ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32));
+
+ ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo
+ .newBuilder()
+ .setChunkName(
+ DigestUtils.md5Hex("dummy-key") + "_stream_"
+ + containerId + "_chunk_" + localId)
+ .setOffset((long) chunkNum * lenOfBytes)
+ .setLen(lenOfBytes)
+ .setChecksumData(checksum(chunkData).getProtoBufMessage())
+ .build();
+
+ WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
+ .newBuilder()
+ .setBlockID(new BlockID(containerId, localId)
+ .getDatanodeBlockIDProtobuf())
+ .setChunkData(chunk)
+ .setData(chunkData);
+
+ return ContainerCommandRequestProto
+ .newBuilder()
+ .setContainerID(containerId)
+ .setCmdType(ContainerProtos.Type.WriteChunk)
+ .setDatanodeUuid(datanodeId)
+ .setWriteChunk(writeChunkRequest)
+ .build();
+ }
+
+ static ContainerCommandRequestProto newPutSmallFile(Long containerId, Long
localId) {
+ ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32));
+ return newPutSmallFile(new BlockID(containerId, localId), chunkData);
+ }
+
+ static ContainerCommandRequestProto newPutSmallFile(
+ BlockID blockID, ByteString data) {
+ final ContainerProtos.BlockData.Builder blockData
+ = ContainerProtos.BlockData.newBuilder()
+ .setBlockID(blockID.getDatanodeBlockIDProtobuf());
+ final ContainerProtos.PutBlockRequestProto.Builder putBlockRequest
+ = ContainerProtos.PutBlockRequestProto.newBuilder()
+ .setBlockData(blockData);
+ final ContainerProtos.KeyValue keyValue =
ContainerProtos.KeyValue.newBuilder()
+ .setKey("OverWriteRequested")
+ .setValue("true")
+ .build();
+ final ContainerProtos.ChunkInfo chunk =
ContainerProtos.ChunkInfo.newBuilder()
+ .setChunkName(blockID.getLocalID() + "_chunk")
+ .setOffset(0)
+ .setLen(data.size())
+ .addMetadata(keyValue)
+ .setChecksumData(checksum(data).getProtoBufMessage())
+ .build();
+ final ContainerProtos.PutSmallFileRequestProto putSmallFileRequest
+ = ContainerProtos.PutSmallFileRequestProto.newBuilder()
+ .setChunkInfo(chunk)
+ .setBlock(putBlockRequest)
+ .setData(data)
+ .build();
+ return ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.PutSmallFile)
+ .setContainerID(blockID.getContainerID())
+ .setDatanodeUuid(UUID.randomUUID().toString())
+ .setPutSmallFile(putSmallFileRequest)
+ .build();
+ }
+
/**
* Creates container read chunk request using input container write chunk
* request.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]