elek commented on a change in pull request #1600:
URL: https://github.com/apache/ozone/pull/1600#discussion_r555138010



##########
File path: 
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
##########
@@ -75,70 +82,120 @@
       description = "Pipeline to use. By default the first RATIS/THREE "
           + "pipeline will be used.",
       defaultValue = "")
-  private String pipelineId;
+  private String pipelineIds;
 
-  private XceiverClientSpi xceiverClientSpi;
+  @Option(names = {"-d", "--datanodes"},
+      description = "Datanodes to use. ",
+      defaultValue = "")
+  private String datanodes;
+
+  private XceiverClientManager xceiverClientManager;
+  private List<XceiverClientSpi> xceiverClients;
 
   private Timer timer;
 
   private ByteString dataToWrite;
   private ChecksumData checksumProtobuf;
 
+
   @Override
   public Void call() throws Exception {
 
-    init();
 
     OzoneConfiguration ozoneConf = createOzoneConfiguration();
+    xceiverClientManager =
+        new XceiverClientManager(ozoneConf);
     if (OzoneSecurityUtil.isSecurityEnabled(ozoneConf)) {
       throw new IllegalArgumentException(
           "Datanode chunk generator is not supported in secure environment");
     }
 
-    try (StorageContainerLocationProtocol scmLocationClient =
-        createStorageContainerLocationClient(ozoneConf)) {
-      List<Pipeline> pipelines = scmLocationClient.listPipelines();
-      Pipeline pipeline;
-      if (pipelineId != null && pipelineId.length() > 0) {
-        pipeline = pipelines.stream()
-            .filter(p -> p.getId().toString().equals(pipelineId))
-            .findFirst()
-            .orElseThrow(() -> new IllegalArgumentException(
-                "Pipeline ID is defined, but there is no such pipeline: "
-                    + pipelineId));
+    List<String> pipelinesFromCmd = Arrays.asList(pipelineIds.split(","));
 
+    List<String> datanodeHosts = Arrays.asList(this.datanodes.split(","));
+
+    Set<Pipeline> pipelines;
+
+    try (StorageContainerLocationProtocol scmLocationClient =
+               createStorageContainerLocationClient(ozoneConf)) {
+      List<Pipeline> pipelinesFromSCM = scmLocationClient.listPipelines();
+      Pipeline firstPipeline;
+      init();
+      if (!arePipelinesOrDatanodesProvided()) {
+        //default behaviour if no arguments provided
+        firstPipeline = pipelinesFromSCM.stream()
+              .filter(p -> p.getFactor() == ReplicationFactor.THREE)
+              .findFirst()
+              .orElseThrow(() -> new IllegalArgumentException(
+                  "Pipeline ID is NOT defined, and no pipeline " +
+                      "has been found with factor=THREE"));
+        XceiverClientSpi xceiverClientSpi = xceiverClientManager
+            .acquireClient(firstPipeline);
+        xceiverClients = new ArrayList<>();
+        xceiverClients.add(xceiverClientSpi);
+        LOG.info("Using pipeline {}", firstPipeline.getId());

Review comment:
       ```suggestion
   ```
   
   You don't need this line as you log the same information in the loop bellow.

##########
File path: 
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
##########
@@ -75,70 +82,120 @@
       description = "Pipeline to use. By default the first RATIS/THREE "
           + "pipeline will be used.",
       defaultValue = "")
-  private String pipelineId;
+  private String pipelineIds;
 
-  private XceiverClientSpi xceiverClientSpi;
+  @Option(names = {"-d", "--datanodes"},
+      description = "Datanodes to use. ",

Review comment:
       ```suggestion
         description = "Datanodes to use. Test will write to all the existing 
pipelines which this datanode is member of.",
   ```

##########
File path: 
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
##########
@@ -75,70 +82,120 @@
       description = "Pipeline to use. By default the first RATIS/THREE "
           + "pipeline will be used.",
       defaultValue = "")
-  private String pipelineId;
+  private String pipelineIds;
 
-  private XceiverClientSpi xceiverClientSpi;
+  @Option(names = {"-d", "--datanodes"},
+      description = "Datanodes to use. ",
+      defaultValue = "")
+  private String datanodes;
+
+  private XceiverClientManager xceiverClientManager;
+  private List<XceiverClientSpi> xceiverClients;
 
   private Timer timer;
 
   private ByteString dataToWrite;
   private ChecksumData checksumProtobuf;
 
+
   @Override
   public Void call() throws Exception {
 
-    init();
 
     OzoneConfiguration ozoneConf = createOzoneConfiguration();
+    xceiverClientManager =
+        new XceiverClientManager(ozoneConf);
     if (OzoneSecurityUtil.isSecurityEnabled(ozoneConf)) {
       throw new IllegalArgumentException(
           "Datanode chunk generator is not supported in secure environment");
     }
 
-    try (StorageContainerLocationProtocol scmLocationClient =
-        createStorageContainerLocationClient(ozoneConf)) {
-      List<Pipeline> pipelines = scmLocationClient.listPipelines();
-      Pipeline pipeline;
-      if (pipelineId != null && pipelineId.length() > 0) {
-        pipeline = pipelines.stream()
-            .filter(p -> p.getId().toString().equals(pipelineId))
-            .findFirst()
-            .orElseThrow(() -> new IllegalArgumentException(
-                "Pipeline ID is defined, but there is no such pipeline: "
-                    + pipelineId));
+    List<String> pipelinesFromCmd = Arrays.asList(pipelineIds.split(","));
 
+    List<String> datanodeHosts = Arrays.asList(this.datanodes.split(","));
+
+    Set<Pipeline> pipelines;
+
+    try (StorageContainerLocationProtocol scmLocationClient =
+               createStorageContainerLocationClient(ozoneConf)) {
+      List<Pipeline> pipelinesFromSCM = scmLocationClient.listPipelines();
+      Pipeline firstPipeline;
+      init();
+      if (!arePipelinesOrDatanodesProvided()) {
+        //default behaviour if no arguments provided
+        firstPipeline = pipelinesFromSCM.stream()
+              .filter(p -> p.getFactor() == ReplicationFactor.THREE)
+              .findFirst()
+              .orElseThrow(() -> new IllegalArgumentException(
+                  "Pipeline ID is NOT defined, and no pipeline " +
+                      "has been found with factor=THREE"));
+        XceiverClientSpi xceiverClientSpi = xceiverClientManager
+            .acquireClient(firstPipeline);
+        xceiverClients = new ArrayList<>();
+        xceiverClients.add(xceiverClientSpi);
+        LOG.info("Using pipeline {}", firstPipeline.getId());
+        runTest();
       } else {
-        pipeline = pipelines.stream()
-            .filter(p -> p.getFactor() == ReplicationFactor.THREE)
-            .findFirst()
-            .orElseThrow(() -> new IllegalArgumentException(
-                "Pipeline ID is NOT defined, and no pipeline " +
-                    "has been found with factor=THREE"));
-        LOG.info("Using pipeline {}", pipeline.getId());
+        xceiverClients = new ArrayList<>();
+        pipelines = new HashSet<>();
+        for(String pipelineId:pipelinesFromCmd){
+          List<Pipeline> selectedPipelines =  pipelinesFromSCM.stream()
+              .filter((p -> p.getId().toString()
+                  .equals("PipelineID=" + pipelineId)
+                  || pipelineContainsDatanode(p, datanodeHosts)))
+               .collect(Collectors.toList());
+          pipelines.addAll(selectedPipelines);
+        }
+        for (Pipeline p:pipelines){
+          LOG.info("Writing to pipeline: " + p.getId());
+          xceiverClients.add(xceiverClientManager.acquireClient(p));
+        }
+        if (pipelines.isEmpty()){
+          throw new IllegalArgumentException(
+              "Coudln't find the any/the selected pipeline");

Review comment:
       ```suggestion
                 "Couldn't find the any/the selected pipeline");
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to