This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 509c970054 HDDS-10442. [hsync] Add a Freon tool to measure client to 
DataNode round-trip latency (#6297)
509c970054 is described below

commit 509c970054886c15cd7dd425104be9d0e4e59723
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Wed Mar 27 11:28:54 2024 -0700

    HDDS-10442. [hsync] Add a Freon tool to measure client to DataNode 
round-trip latency (#6297)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   1 +
 .../ContainerCommandResponseBuilders.java          |  28 ++++
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  38 +++++
 .../org/apache/hadoop/ozone/audit/DNAction.java    |   3 +-
 .../container/common/impl/HddsDispatcher.java      |   1 +
 .../ozone/container/keyvalue/KeyValueHandler.java  |   8 +
 .../src/main/proto/DatanodeClientProtocol.proto    |  13 ++
 .../hdds/scm/cli/ContainerOperationClient.java     |   2 +-
 .../src/main/smoketest/freon/echoRPCLoad.robot     |  21 +++
 .../hadoop/hdds/scm/TestContainerSmallFile.java    |  19 +++
 .../hadoop/ozone/freon/TestDNRPCLoadGenerator.java | 111 +++++++++++++
 .../hadoop/ozone/freon/DNRPCLoadGenerator.java     | 178 +++++++++++++++++++++
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |   3 +-
 .../hadoop/ozone/freon/OmRPCLoadGenerator.java     |   6 +-
 14 files changed, 426 insertions(+), 6 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index ee1c9669a1..e1188f1cd1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -424,6 +424,7 @@ public final class HddsUtils {
     case ListContainer:
     case ListChunk:
     case GetCommittedBlockLength:
+    case Echo:
       return true;
     case CloseContainer:
     case WriteChunk:
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 9acb0e5c33..86336e9bc7 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.function.Function;
+
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
@@ -42,6 +44,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
 
@@ -319,6 +322,31 @@ public final class ContainerCommandResponseBuilders {
         .build();
   }
 
+  public static ContainerCommandResponseProto getEchoResponse(
+      ContainerCommandRequestProto msg) {
+
+    ContainerProtos.EchoRequestProto echoRequest = msg.getEcho();
+    int responsePayload = echoRequest.getPayloadSizeResp();
+
+    int sleepTimeMs = echoRequest.getSleepTimeMs();
+    try {
+      if (sleepTimeMs > 0) {
+        Thread.sleep(sleepTimeMs);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    ContainerProtos.EchoResponseProto.Builder echo =
+        ContainerProtos.EchoResponseProto
+            .newBuilder()
+            
.setPayload(UnsafeByteOperations.unsafeWrap(RandomUtils.nextBytes(responsePayload)));
+
+    return getSuccessResponseBuilder(msg)
+        .setEcho(echo)
+        .build();
+  }
+
   private ContainerCommandResponseBuilders() {
     throw new UnsupportedOperationException("no instances");
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 58bb326eb0..1453ae56b4 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -57,6 +57,8 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContai
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoResponseProto;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -65,6 +67,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.security.token.Token;
@@ -661,6 +664,41 @@ public final class ContainerProtocolCalls  {
     return response.getGetSmallFile();
   }
 
+  /**
+   * Send an echo to DataNode.
+   *
+   * @return EchoResponseProto
+   */
+  public static EchoResponseProto echo(XceiverClientSpi client, String 
encodedContainerID,
+      long containerID, ByteString payloadReqBytes, int payloadRespSizeKB, int 
sleepTimeMs) throws IOException {
+    ContainerProtos.EchoRequestProto getEcho =
+        EchoRequestProto
+            .newBuilder()
+            .setPayload(payloadReqBytes)
+            .setPayloadSizeResp(payloadRespSizeKB)
+            .setSleepTimeMs(sleepTimeMs)
+            .build();
+    String id = client.getPipeline().getClosestNode().getUuidString();
+
+    ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
+        .newBuilder()
+        .setCmdType(Type.Echo)
+        .setContainerID(containerID)
+        .setDatanodeUuid(id)
+        .setEcho(getEcho);
+    if (!encodedContainerID.isEmpty()) {
+      builder.setEncodedToken(encodedContainerID);
+    }
+    String traceId = TracingUtil.exportCurrentSpan();
+    if (traceId != null) {
+      builder.setTraceID(traceId);
+    }
+    ContainerCommandRequestProto request = builder.build();
+    ContainerCommandResponseProto response =
+        client.sendCommand(request, getValidatorList());
+    return response.getEcho();
+  }
+
   /**
    * Validates a response from a container protocol call.  Any non-successful
    * return code is mapped to a corresponding exception and thrown.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index d271e7d5d4..f7a38e3dec 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
   CLOSE_CONTAINER,
   GET_COMMITTED_BLOCK_LENGTH,
   STREAM_INIT,
-  FINALIZE_BLOCK;
+  FINALIZE_BLOCK,
+  ECHO;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index f20615d23f..8e68eeac53 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -807,6 +807,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
     case StreamInit       : return DNAction.STREAM_INIT;
     case FinalizeBlock    : return DNAction.FINALIZE_BLOCK;
+    case Echo             : return DNAction.ECHO;
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 3a945c2212..3d9214c917 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -103,6 +103,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
+import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
@@ -279,6 +280,8 @@ public class KeyValueHandler extends Handler {
       return handler.handleGetCommittedBlockLength(request, kvContainer);
     case FinalizeBlock:
       return handler.handleFinalizeBlock(request, kvContainer);
+    case Echo:
+      return handler.handleEcho(request, kvContainer);
     default:
       return null;
     }
@@ -611,6 +614,11 @@ public class KeyValueHandler extends Handler {
     return getFinalizeBlockResponse(request, responseData);
   }
 
+  ContainerCommandResponseProto handleEcho(
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+    return getEchoResponse(request);
+  }
+
   /**
    * Handle Get Block operation. Calls BlockManager to process the request.
    */
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 0206a8ea71..ccde261de0 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -107,6 +107,7 @@ enum Type {
   StreamWrite = 20;
 
   FinalizeBlock = 21;
+  Echo = 22;
 }
 
 
@@ -215,6 +216,7 @@ message ContainerCommandRequestProto {
   optional   uint32 version = 24;
 
   optional   FinalizeBlockRequestProto finalizeBlock = 25;
+  optional   EchoRequestProto echo = 26;
 }
 
 message ContainerCommandResponseProto {
@@ -247,6 +249,7 @@ message ContainerCommandResponseProto {
   optional   GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
 
   optional   FinalizeBlockResponseProto finalizeBlock = 22;
+  optional   EchoResponseProto echo = 23;
 }
 
 message ContainerDataProto {
@@ -390,6 +393,16 @@ message  ListBlockResponseProto {
   repeated BlockData blockData = 1;
 }
 
+message  EchoRequestProto {
+  optional bytes payload = 1;
+  optional int32 payloadSizeResp = 2;
+  optional int32 sleepTimeMs = 3;
+}
+
+message  EchoResponseProto {
+  optional bytes payload = 1;
+}
+
 // Chunk Operations
 
 message ChunkInfo {
diff --git 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 499d58b1ff..7898ed76b1 100644
--- 
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++ 
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -184,7 +184,7 @@ public class ContainerOperationClient implements ScmClient {
     }
   }
 
-  private String getEncodedContainerToken(long containerId) throws IOException 
{
+  public String getEncodedContainerToken(long containerId) throws IOException {
     if (!containerTokenEnabled) {
       return "";
     }
diff --git a/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot 
b/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
index 32456af488..c6ea4e6346 100644
--- a/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
@@ -23,6 +23,27 @@ ${PREFIX}    ${EMPTY}
 ${n}    1
 
 *** Test Cases ***
+Get Container ID
+    ${result} =          Execute        ozone admin container create
+    ${containerID} =     Execute        ozone admin container list --count 1 
--state=OPEN | grep -o '"containerID" *: *[^,}]*' | awk -F'[:,]' '{print $2}' | 
tr -d '" '
+    Set Suite Variable   ${containerID}
+
+[Read] Ozone DataNode Echo RPC Load Generator with request payload and 
response payload
+    ${result} =        Execute          ozone freon dne -t=1 -n=${n} 
--payload-req=1 --payload-resp=1 --container-id=${containerID}
+                       Should contain   ${result}   Successful executions: ${n}
+
+[Read] Ozone DataNode Echo RPC Load Generator with request payload and empty 
response payload
+    ${result} =        Execute          ozone freon dne -t=1 -n=${n} 
--payload-req=1 --container-id=${containerID}
+                       Should contain   ${result}   Successful executions: ${n}
+
+[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and 
response payload
+    ${result} =        Execute          ozone freon dne -t=1 -n=${n} 
--payload-resp=1 --container-id=${containerID}
+                       Should contain   ${result}   Successful executions: ${n}
+
+[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and 
empty response payload no sleep time one xceiver client
+    ${result} =        Execute          ozone freon dne -t=1 -n=${n} 
--sleep-time-ms=0 --clients=1 --container-id=${containerID}
+                       Should contain   ${result}   Successful executions: ${n}
+
 [Read] Ozone Echo RPC Load Generator with request payload and response payload
     ${result} =        Execute          ozone freon ome -t=1 -n=${n} 
--payload-req=1 --payload-resp=1
                        Should contain   ${result}   Successful executions: ${n}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
index 30c4e4cd5b..5dab271d9e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
@@ -31,6 +31,8 @@ import 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -188,6 +190,23 @@ public class TestContainerSmallFile {
     assertEquals("data123", readData);
     xceiverClientManager.releaseClient(client, false);
   }
+
+  @Test
+  public void testEcho() throws Exception {
+    ContainerWithPipeline container =
+        storageContainerLocationClient.allocateContainer(
+            SCMTestUtils.getReplicationType(ozoneConfig),
+            HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
+    XceiverClientSpi client = xceiverClientManager
+        .acquireClient(container.getPipeline());
+    ContainerProtocolCalls.createContainer(client,
+        container.getContainerInfo().getContainerID(), null);
+    ByteString byteString = UnsafeByteOperations.unsafeWrap(new byte[0]);
+    ContainerProtos.EchoResponseProto response =
+        ContainerProtocolCalls.echo(client, "", 
container.getContainerInfo().getContainerID(), byteString, 1, 0);
+    assertEquals(1, response.getPayload().size());
+    xceiverClientManager.releaseClient(client, false);
+  }
 }
 
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java
new file mode 100644
index 0000000000..d049a7e320
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import picocli.CommandLine;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests Freon, with MiniOzoneCluster and validate data.
+ */
+public class TestDNRPCLoadGenerator {
+
+  private static MiniOzoneCluster cluster = null;
+  private static ContainerWithPipeline container;
+
+  private static void startCluster(OzoneConfiguration conf) throws Exception {
+    DatanodeRatisServerConfig ratisServerConfig =
+        conf.getObject(DatanodeRatisServerConfig.class);
+    ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+    ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10));
+    conf.setFromObject(ratisServerConfig);
+
+    RatisClientConfig.RaftConfig raftClientConfig =
+        conf.getObject(RatisClientConfig.RaftConfig.class);
+    raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+    raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
+    conf.setFromObject(raftClientConfig);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(5).build();
+    cluster.waitForClusterToBeReady();
+    cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE,
+            180000);
+
+    StorageContainerLocationProtocolClientSideTranslatorPB
+        storageContainerLocationClient = cluster
+        .getStorageContainerLocationClient();
+    container =
+        storageContainerLocationClient.allocateContainer(
+            SCMTestUtils.getReplicationType(conf),
+            HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
+    XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
+    XceiverClientSpi client = xceiverClientManager
+        .acquireClient(container.getPipeline());
+    ContainerProtocolCalls.createContainer(client,
+        container.getContainerInfo().getContainerID(), null);
+  }
+
+  static void shutdownCluster() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @BeforeAll
+  public static void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    startCluster(conf);
+  }
+
+  @AfterAll
+  public static void shutdown() {
+    shutdownCluster();
+  }
+
+  @Test
+  public void test() {
+    DNRPCLoadGenerator randomKeyGenerator =
+        new DNRPCLoadGenerator(cluster.getConf());
+    CommandLine cmd = new CommandLine(randomKeyGenerator);
+    int exitCode = cmd.execute(
+        "--container-id", 
Long.toString(container.getContainerInfo().getContainerID()),
+        "--clients", "5",
+        "-t", "10");
+    assertEquals(0, exitCode);
+  }
+}
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java
new file mode 100644
index 0000000000..1d1b898a7d
--- /dev/null
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.ozone.common.PayloadUtils.generatePayloadBytes;
+
+/**
+ * Utility to generate RPC request to DN.
+ */
+@Command(name = "dn-echo",
+        aliases = "dne",
+        description =
+                "Generate echo RPC request to DataNode",
+        versionProvider = HddsVersionProvider.class,
+        mixinStandardHelpOptions = true,
+        showDefaultValues = true)
+public class DNRPCLoadGenerator extends BaseFreonGenerator
+        implements Callable<Void> {
+
+  private static final int RPC_PAYLOAD_MULTIPLICATION_FACTOR = 1024;
+  private static final int MAX_SIZE_KB = 2097151;
+  private Timer timer;
+  private OzoneConfiguration configuration;
+  private ByteString payloadReqBytes;
+  private int payloadRespSize;
+  private List<XceiverClientSpi> clients;
+  private String encodedContainerToken;
+  @Option(names = {"--payload-req"},
+          description =
+                  "Specifies the size of payload in KB in RPC request. ",
+          defaultValue = "0")
+  private int payloadReqSizeKB = 0;
+
+  @Option(names = {"--payload-resp"},
+          description =
+                  "Specifies the size of payload in KB in RPC response. ",
+          defaultValue = "0")
+  private int payloadRespSizeKB = 0;
+
+  @Option(names = {"--container-id"},
+      description = "Send echo to DataNodes associated with this container")
+  private long containerID;
+
+  @Option(names = {"--sleep-time-ms"},
+      description = "Let DataNode to pause for a duration (in milliseconds) 
for each request",
+      defaultValue = "0")
+  private int sleepTimeMs = 0;
+
+  @Option(names = {"--clients"},
+      description = "number of xceiver clients",
+      defaultValue = "1")
+  private int numClients = 1;
+
+  @CommandLine.ParentCommand
+  private Freon freon;
+
+  // empy constructor for picocli
+  DNRPCLoadGenerator() {
+  }
+
+  @VisibleForTesting
+  DNRPCLoadGenerator(OzoneConfiguration ozoneConfiguration) {
+    this.configuration = ozoneConfiguration;
+  }
+
+  @Override
+  public Void call() throws Exception {
+    Preconditions.checkArgument(payloadReqSizeKB >= 0,
+            "OM echo request payload size should be positive value or zero.");
+    Preconditions.checkArgument(payloadRespSizeKB >= 0,
+            "OM echo response payload size should be positive value or zero.");
+
+    if (configuration == null) {
+      configuration = freon.createOzoneConfiguration();
+    }
+    ContainerOperationClient scmClient = new 
ContainerOperationClient(configuration);
+    ContainerInfo containerInfo = scmClient.getContainer(containerID);
+
+    List<Pipeline> pipelineList = scmClient.listPipelines();
+    Pipeline pipeline = pipelineList.stream()
+        .filter(p -> p.getId().equals(containerInfo.getPipelineID()))
+        .findFirst()
+        .orElse(null);
+    encodedContainerToken = scmClient.getEncodedContainerToken(containerID);
+    XceiverClientFactory xceiverClientManager;
+    if (OzoneSecurityUtil.isSecurityEnabled(configuration)) {
+      CACertificateProvider caCerts = () -> HAUtils.buildCAX509List(null, 
configuration);
+      xceiverClientManager = new XceiverClientManager(configuration,
+          configuration.getObject(XceiverClientManager.ScmClientConfig.class),
+          new ClientTrustManager(caCerts, null));
+    } else {
+      xceiverClientManager = new XceiverClientManager(configuration);
+    }
+    clients = new ArrayList<>(numClients);
+    for (int i = 0; i < numClients; i++) {
+      clients.add(xceiverClientManager.acquireClient(pipeline));
+    }
+
+    init();
+    payloadReqBytes = 
UnsafeByteOperations.unsafeWrap(generatePayloadBytes(payloadReqSizeKB));
+    payloadRespSize = calculateMaxPayloadSize(payloadRespSizeKB);
+    timer = getMetrics().timer("rpc-payload");
+    try {
+      runTests(this::sendRPCReq);
+    } finally {
+      for (XceiverClientSpi client : clients) {
+        xceiverClientManager.releaseClient(client, false);
+      }
+      xceiverClientManager.close();
+      scmClient.close();
+    }
+    return null;
+  }
+
+  private int calculateMaxPayloadSize(int payloadSizeKB) {
+    if (payloadSizeKB > 0) {
+      return Math.min(
+              Math.toIntExact((long)payloadSizeKB *
+                      RPC_PAYLOAD_MULTIPLICATION_FACTOR),
+              MAX_SIZE_KB);
+    }
+    return 0;
+  }
+
+  private void sendRPCReq(long l) throws Exception {
+    timer.time(() -> {
+      int clientIndex = (numClients == 1) ? 0 : (int)l % numClients;
+      ContainerProtos.EchoResponseProto response =
+          ContainerProtocolCalls.echo(clients.get(clientIndex), 
encodedContainerToken,
+              containerID, payloadReqBytes, payloadRespSize, sleepTimeMs);
+      return null;
+    });
+  }
+}
+
+
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index bd5510695f..349887a776 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -73,7 +73,8 @@ import static 
org.apache.hadoop.hdds.server.http.HttpServer2.setHttpBaseDir;
         OzoneClientKeyReadWriteListOps.class,
         RangeKeysGenerator.class,
         DatanodeSimulator.class,
-        OmMetadataGenerator.class
+        OmMetadataGenerator.class,
+        DNRPCLoadGenerator.class
     },
     versionProvider = HddsVersionProvider.class,
     mixinStandardHelpOptions = true)
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
index 958df4c11a..90807a0e6f 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.freon;
 
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.cli.HddsVersionProvider;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -27,6 +26,8 @@ import java.util.concurrent.Callable;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
 
+import static org.apache.hadoop.ozone.common.PayloadUtils.generatePayloadBytes;
+
 /**
  * Utility to generate RPC request to OM with or without payload.
  */
@@ -88,8 +89,7 @@ public class OmRPCLoadGenerator extends BaseFreonGenerator
     }
 
     init();
-    payloadReqBytes = RandomUtils.nextBytes(
-            calculateMaxPayloadSize(payloadReqSizeKB));
+    payloadReqBytes = generatePayloadBytes(payloadReqSizeKB);
     payloadRespSize = calculateMaxPayloadSize(payloadRespSizeKB);
     timer = getMetrics().timer("rpc-payload");
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to