This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eca5b77c0 HDDS-10547. Fix shared buffer for datanode checksum 
calculation (#6402)
8eca5b77c0 is described below

commit 8eca5b77c0ac56f30ab2b897af907b9d6ca09a20
Author: Cyrill <[email protected]>
AuthorDate: Mon Mar 25 22:08:33 2024 +0300

    HDDS-10547. Fix shared buffer for datanode checksum calculation (#6402)
---
 .../ozone/container/keyvalue/KeyValueHandler.java  |   4 +-
 .../container/common/impl/TestHddsDispatcher.java  | 148 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 14532c550d..01bd4db8a1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -756,7 +756,7 @@ public class KeyValueHandler extends Handler {
       throws StorageContainerException {
     if (validateChunkChecksumData) {
       try {
-        Checksum.verifyChecksum(data, info.getChecksumData(), 0);
+        Checksum.verifyChecksum(data.duplicate(data.position(), data.limit()), 
info.getChecksumData(), 0);
       } catch (OzoneChecksumException ex) {
         throw ChunkUtils.wrapInStorageContainerException(ex);
       }
@@ -857,9 +857,9 @@ public class KeyValueHandler extends Handler {
 
       // chunks will be committed as a part of handling putSmallFile
       // here. There is no need to maintain this info in openContainerBlockMap.
+      validateChunkChecksumData(data, chunkInfo);
       chunkManager
           .writeChunk(kvContainer, blockID, chunkInfo, data, 
dispatcherContext);
-      validateChunkChecksumData(data, chunkInfo);
       chunkManager.finishWriteChunks(kvContainer, blockData);
 
       List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 1401520231..eaf901c67a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -40,6 +41,8 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.hadoop.ozone.common.ChecksumData;
+import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
@@ -47,6 +50,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
@@ -164,6 +168,72 @@ public class TestHddsDispatcher {
     }
   }
 
+  @Test
+  public void testSmallFileChecksum() throws IOException {
+    String testDirPath = testDir.getPath();
+    try {
+      UUID scmId = UUID.randomUUID();
+      OzoneConfiguration conf = new OzoneConfiguration();
+      conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
+      DatanodeConfiguration dnConf = 
conf.getObject(DatanodeConfiguration.class);
+      dnConf.setChunkDataValidationCheck(true);
+      conf.setFromObject(dnConf);
+      DatanodeDetails dd = randomDatanodeDetails();
+      HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
+
+      ContainerCommandResponseProto smallFileResponse =
+          hddsDispatcher.dispatch(newPutSmallFile(1L, 1L), null);
+
+      assertEquals(ContainerProtos.Result.SUCCESS, 
smallFileResponse.getResult());
+    } finally {
+      ContainerMetrics.remove();
+    }
+  }
+
+  @Test
+  public void testWriteChunkChecksum() throws IOException {
+    String testDirPath = testDir.getPath();
+    try {
+      UUID scmId = UUID.randomUUID();
+      OzoneConfiguration conf = new OzoneConfiguration();
+      conf.set(HDDS_DATANODE_DIR_KEY, testDirPath);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDirPath);
+      DatanodeConfiguration dnConf = 
conf.getObject(DatanodeConfiguration.class);
+      dnConf.setChunkDataValidationCheck(true);
+      conf.setFromObject(dnConf);
+      DatanodeDetails dd = randomDatanodeDetails();
+      HddsDispatcher hddsDispatcher = createDispatcher(dd, scmId, conf);
+      //Send a few WriteChunkRequests
+      ContainerCommandResponseProto response;
+      ContainerCommandRequestProto writeChunkRequest0 = 
getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 0);
+      hddsDispatcher.dispatch(writeChunkRequest0, null);
+      hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 
1L, 1), null);
+      response = 
hddsDispatcher.dispatch(getWriteChunkRequest0(dd.getUuidString(), 1L, 1L, 2), 
null);
+
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+      // Send Read Chunk request for written chunk.
+      response =
+          hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0), 
null);
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+
+      ByteString responseData = BufferUtils.concatByteStrings(
+          response.getReadChunk().getDataBuffers().getBuffersList());
+      assertEquals(writeChunkRequest0.getWriteChunk().getData(),
+          responseData);
+
+      // Test checksum on Read:
+      final DispatcherContext context = DispatcherContext
+          .newBuilder(DispatcherContext.Op.READ_STATE_MACHINE_DATA)
+          .build();
+      response =
+          hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest0), 
context);
+      assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
+    } finally {
+      ContainerMetrics.remove();
+    }
+  }
+
   @ContainerLayoutTestInfo.ContainerTest
   public void testContainerCloseActionWhenVolumeFull(
       ContainerLayoutVersion layoutVersion) throws Exception {
@@ -514,6 +584,84 @@ public class TestHddsDispatcher {
         .build();
   }
 
+  static ChecksumData checksum(ByteString data) {
+    try {
+      return new Checksum(ContainerProtos.ChecksumType.CRC32, 256)
+          .computeChecksum(data.asReadOnlyByteBuffer());
+    } catch (OzoneChecksumException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private ContainerCommandRequestProto getWriteChunkRequest0(
+      String datanodeId, Long containerId, Long localId, int chunkNum) {
+    final int lenOfBytes = 32;
+    ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32));
+
+    ContainerProtos.ChunkInfo chunk = ContainerProtos.ChunkInfo
+        .newBuilder()
+        .setChunkName(
+            DigestUtils.md5Hex("dummy-key") + "_stream_"
+                + containerId + "_chunk_" + localId)
+        .setOffset((long) chunkNum * lenOfBytes)
+        .setLen(lenOfBytes)
+        .setChecksumData(checksum(chunkData).getProtoBufMessage())
+        .build();
+
+    WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
+        .newBuilder()
+        .setBlockID(new BlockID(containerId, localId)
+            .getDatanodeBlockIDProtobuf())
+        .setChunkData(chunk)
+        .setData(chunkData);
+
+    return ContainerCommandRequestProto
+        .newBuilder()
+        .setContainerID(containerId)
+        .setCmdType(ContainerProtos.Type.WriteChunk)
+        .setDatanodeUuid(datanodeId)
+        .setWriteChunk(writeChunkRequest)
+        .build();
+  }
+
+  static ContainerCommandRequestProto newPutSmallFile(Long containerId, Long 
localId) {
+    ByteString chunkData = ByteString.copyFrom(RandomUtils.nextBytes(32));
+    return newPutSmallFile(new BlockID(containerId, localId), chunkData);
+  }
+
+  static ContainerCommandRequestProto newPutSmallFile(
+      BlockID blockID, ByteString data) {
+    final ContainerProtos.BlockData.Builder blockData
+        = ContainerProtos.BlockData.newBuilder()
+        .setBlockID(blockID.getDatanodeBlockIDProtobuf());
+    final ContainerProtos.PutBlockRequestProto.Builder putBlockRequest
+        = ContainerProtos.PutBlockRequestProto.newBuilder()
+        .setBlockData(blockData);
+    final ContainerProtos.KeyValue keyValue = 
ContainerProtos.KeyValue.newBuilder()
+        .setKey("OverWriteRequested")
+        .setValue("true")
+        .build();
+    final ContainerProtos.ChunkInfo chunk = 
ContainerProtos.ChunkInfo.newBuilder()
+        .setChunkName(blockID.getLocalID() + "_chunk")
+        .setOffset(0)
+        .setLen(data.size())
+        .addMetadata(keyValue)
+        .setChecksumData(checksum(data).getProtoBufMessage())
+        .build();
+    final ContainerProtos.PutSmallFileRequestProto putSmallFileRequest
+        = ContainerProtos.PutSmallFileRequestProto.newBuilder()
+        .setChunkInfo(chunk)
+        .setBlock(putBlockRequest)
+        .setData(data)
+        .build();
+    return ContainerCommandRequestProto.newBuilder()
+        .setCmdType(ContainerProtos.Type.PutSmallFile)
+        .setContainerID(blockID.getContainerID())
+        .setDatanodeUuid(UUID.randomUUID().toString())
+        .setPutSmallFile(putSmallFileRequest)
+        .build();
+  }
+
   /**
    * Creates container read chunk request using input container write chunk
    * request.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to