This is an automated email from the ASF dual-hosted git repository.

sumitagrawal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 60215826fd HDDS-9883. Recon - Improve the performance of processing 
IncrementalContainerReport from DN (#5793)
60215826fd is described below

commit 60215826fd082563ae74a526f04c24eb8d3865b4
Author: Devesh Kumar Singh <[email protected]>
AuthorDate: Thu Jan 4 20:27:52 2024 +0530

    HDDS-9883. Recon - Improve the performance of processing 
IncrementalContainerReport from DN (#5793)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  8 ++++
 .../common/src/main/resources/ozone-default.xml    | 26 ++++++++++++
 .../hdds/scm/server/ContainerReportQueue.java      |  8 ++++
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java | 13 ++++++
 .../hadoop/ozone/TestOzoneConfigurationFields.java |  5 ++-
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  | 17 ++++++++
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  | 29 +++++++++++++
 .../ozone/recon/scm/ReconContainerManager.java     |  4 ++
 .../ozone/recon/scm/ReconContainerReportQueue.java | 47 ++++++++++++++++++++++
 .../ReconIncrementalContainerReportHandler.java    | 33 +++++++++------
 .../scm/ReconStorageContainerManagerFacade.java    | 29 ++++++++++++-
 ...TestReconIncrementalContainerReportHandler.java | 15 ++++++-
 12 files changed, 218 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f124e24141..21c89cc3c8 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -668,6 +668,14 @@ public final class OzoneConfigKeys {
   public static final String OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION =
       "ozone.scm.close.container.wait.duration";
 
+  public static final String HDDS_SCM_CLIENT_RPC_TIME_OUT =
+      "hdds.scmclient.rpc.timeout";
+  public static final String HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT =
+      "hdds.scmclient.max.retry.timeout";
+  public static final String HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY =
+      "hdds.scmclient.failover.max.retry";
+
+
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index c70a9630f0..079362f916 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3118,6 +3118,32 @@
       SCM snapshot.
     </description>
   </property>
+  <property>
+    <name>ozone.recon.scmclient.rpc.timeout</name>
+    <value>1m</value>
+    <tag>OZONE, RECON, SCM</tag>
+    <description>
+      RpcClient timeout on waiting for the response from SCM when Recon 
connects to SCM.
+    </description>
+  </property>
+  <property>
+    <name>ozone.recon.scmclient.max.retry.timeout</name>
+    <value>6s</value>
+    <tag>OZONE, RECON, SCM</tag>
+    <description>
+      Max retry timeout for SCM Client when Recon connects to SCM. This config 
is used to
+      dynamically compute the max retry count for SCM Client when failover 
happens. Check the
+      SCMClientConfig class getRetryCount method.
+    </description>
+  </property>
+  <property>
+    <name>ozone.recon.scmclient.failover.max.retry</name>
+    <value>3</value>
+    <tag>OZONE, RECON, SCM</tag>
+    <description>
+      Max retry count for SCM Client when failover happens.
+    </description>
+  </property>
   <property>
     <name>ozone.recon.om.socket.timeout</name>
     <value>5s</value>
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
index b08b525a86..bffddff87b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ContainerReportQueue.java
@@ -112,6 +112,9 @@ public class ContainerReportQueue
 
       // 2. Add ICR report or merge to previous ICR
       List<ContainerReport> dataList = dataMap.get(uuidString);
+      if (mergeIcr(val, dataList)) {
+        return true;
+      }
       dataList.add(val);
       ++capacity;
       orderingQueue.add(uuidString);
@@ -375,4 +378,9 @@ public class ContainerReportQueue
     }
     return 0;
   }
+
+  protected boolean mergeIcr(ContainerReport val,
+                             List<ContainerReport> dataList) {
+    return false;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index aaadbbbcb9..38db618ef5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -279,6 +279,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
   public interface ContainerReport {
     DatanodeDetails getDatanodeDetails();
     ContainerReportType getType();
+    void mergeReport(ContainerReport val);
   }
 
   /**
@@ -334,6 +335,9 @@ public final class SCMDatanodeHeartbeatDispatcher {
       return getDatanodeDetails().toString() + ", {type: " + getType()
           + ", size: " + getReport().getReportsList().size() + "}";
     }
+
+    @Override
+    public void mergeReport(ContainerReport nextReport) { }
   }
 
   /**
@@ -374,6 +378,15 @@ public final class SCMDatanodeHeartbeatDispatcher {
       return getDatanodeDetails().toString() + ", {type: " + getType()
           + ", size: " + getReport().getReportList().size() + "}";
     }
+
+    @Override
+    public void mergeReport(ContainerReport nextReport) {
+      if (nextReport.getType() == ContainerReportType.ICR) {
+        getReport().getReportList().addAll(
+            ((ReportFromDatanode<IncrementalContainerReportProto>) nextReport)
+                .getReport().getReportList());
+      }
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index cb29d61e1a..1a437be813 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -141,7 +141,10 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
         ScmConfigKeys.OZONE_SCM_PIPELINE_PLACEMENT_IMPL_KEY,
         ScmConfigKeys.OZONE_SCM_HA_PREFIX,
         S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED,
-        HddsConfigKeys.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT
+        HddsConfigKeys.HDDS_DATANODE_VOLUME_MIN_FREE_SPACE_PERCENT,
+        OzoneConfigKeys.HDDS_SCM_CLIENT_RPC_TIME_OUT,
+        OzoneConfigKeys.HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT,
+        OzoneConfigKeys.HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY
     ));
   }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index b3c601c4c1..ab87bda441 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -168,6 +168,23 @@ public final class  ReconServerConfigKeys {
 
   public static final String
       OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT = "1m";
+
+  public static final String OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY =
+      "ozone.recon.scmclient.rpc.timeout";
+
+  public static final String OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT = 
"1m";
+
+  public static final String OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY =
+      "ozone.recon.scmclient.max.retry.timeout";
+
+  public static final String OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT =
+      "6s";
+
+  public static final String OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY =
+      "ozone.recon.scmclient.failover.max.retry";
+
+  public static final int
+      OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT = 3;
   /**
    * Private constructor for utility class.
    */
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
index 0d0c57fbe3..c548561073 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconUtils.java
@@ -29,6 +29,9 @@ import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
@@ -36,7 +39,9 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
@@ -44,6 +49,9 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT;
 import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
 import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
@@ -51,9 +59,11 @@ import static org.jooq.impl.DSL.currentTimestamp;
 import static org.jooq.impl.DSL.select;
 import static org.jooq.impl.DSL.using;
 
+import org.apache.hadoop.ozone.recon.scm.ReconContainerReportQueue;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.hadoop.ozone.recon.schema.tables.daos.GlobalStatsDao;
 import org.hadoop.ozone.recon.schema.tables.pojos.GlobalStats;
+import org.jetbrains.annotations.NotNull;
 import org.jooq.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +86,25 @@ public class ReconUtils {
     return new ReconUtils().getReconDbDir(conf, OZONE_RECON_SCM_DB_DIR);
   }
 
+  @NotNull
+  public static List<BlockingQueue<SCMDatanodeHeartbeatDispatcher
+      .ContainerReport>> initContainerReportQueue(
+      OzoneConfiguration configuration) {
+    int threadPoolSize =
+        configuration.getInt(ScmUtils.getContainerReportConfPrefix()
+                + ".thread.pool.size",
+            OZONE_SCM_EVENT_THREAD_POOL_SIZE_DEFAULT);
+    int queueSize = configuration.getInt(
+        ScmUtils.getContainerReportConfPrefix() + ".queue.size",
+        OZONE_SCM_EVENT_CONTAINER_REPORT_QUEUE_SIZE_DEFAULT);
+    List<BlockingQueue<SCMDatanodeHeartbeatDispatcher.ContainerReport>> queues 
=
+        new ArrayList<>();
+    for (int i = 0; i < threadPoolSize; ++i) {
+      queues.add(new ReconContainerReportQueue(queueSize));
+    }
+    return queues;
+  }
+
   /**
    * Get configured Recon DB directory value based on config. If not present,
    * fallback to ozone.metadata.dirs
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index e4108c2834..9d7c88dfc4 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -459,4 +459,8 @@ public class ReconContainerManager extends 
ContainerManagerImpl {
     return pipelineToOpenContainer;
   }
 
+  @VisibleForTesting
+  public StorageContainerServiceProvider getScmClient() {
+    return scmClient;
+  }
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportQueue.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportQueue.java
new file mode 100644
index 0000000000..8d5f92eda4
--- /dev/null
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerReportQueue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.recon.scm;
+
+import org.apache.hadoop.hdds.scm.server.ContainerReportQueue;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
+
+import java.util.List;
+
+/**
+ * Customized queue to handle multiple ICR report together.
+ */
+public class ReconContainerReportQueue extends ContainerReportQueue {
+
+  public ReconContainerReportQueue(int queueSize) {
+    super(queueSize);
+  }
+
+  @Override
+  protected boolean mergeIcr(ContainerReport val,
+                             List<ContainerReport> dataList) {
+    if (!dataList.isEmpty()) {
+      if (SCMDatanodeHeartbeatDispatcher.ContainerReportType.ICR
+          == dataList.get(dataList.size() - 1).getType()) {
+        dataList.get(dataList.size() - 1).mergeReport(val);
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
index 18d995d053..1f2b1d5cf2 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconIncrementalContainerReportHandler.java
@@ -24,8 +24,8 @@ import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -69,24 +69,33 @@ public class ReconIncrementalContainerReportHandler
 
     ReconContainerManager containerManager =
         (ReconContainerManager) getContainerManager();
+    try {
+      containerManager.checkAndAddNewContainerBatch(
+          report.getReport().getReportList());
+    } catch (Exception ioEx) {
+      LOG.error("Exception while checking and adding new container.", ioEx);
+      return;
+    }
     boolean success = true;
     for (ContainerReplicaProto replicaProto :
         report.getReport().getReportList()) {
+      ContainerID id = ContainerID.valueOf(replicaProto.getContainerID());
+      ContainerInfo container = null;
       try {
-        final ContainerID id = ContainerID.valueOf(
-            replicaProto.getContainerID());
         try {
-          containerManager.checkAndAddNewContainer(id, replicaProto.getState(),
-              report.getDatanodeDetails());
-        } catch (Exception ioEx) {
-          LOG.error("Exception while checking and adding new container.", 
ioEx);
-          return;
+          container = getContainerManager().getContainer(id);
+          // Ensure we reuse the same ContainerID instance in containerInfo
+          id = container.containerID();
+        } finally {
+          if (replicaProto.getState().equals(
+              ContainerReplicaProto.State.DELETED)) {
+            getNodeManager().removeContainer(dd, id);
+          } else {
+            getNodeManager().addContainer(dd, id);
+          }
         }
-        getNodeManager().addContainer(dd, id);
         processContainerReplica(dd, replicaProto, publisher);
-      } catch (ContainerNotFoundException e) {
-        success = false;
-        LOG.warn("Container {} not found!", replicaProto.getContainerID());
+        success = true;
       } catch (NodeNotFoundException ex) {
         success = false;
         LOG.error("Received ICR from unknown datanode {}.",
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 464ec1a5ee..556c619419 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -99,11 +99,21 @@ import 
org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
 import org.apache.hadoop.ozone.recon.tasks.ContainerSizeCountTask;
 import org.apache.hadoop.ozone.recon.tasks.ReconTaskConfig;
 import com.google.inject.Inject;
+
 import static 
org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_SCM_CLIENT_RPC_TIME_OUT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
 import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
@@ -182,6 +192,23 @@ public class ReconStorageContainerManagerFacade
         .setSCM(this)
         .build();
     this.ozoneConfiguration = getReconScmConfiguration(conf);
+    long scmClientRPCTimeOut = conf.getTimeDuration(
+        OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_KEY,
+        OZONE_RECON_SCM_CLIENT_RPC_TIME_OUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    long scmClientMaxRetryTimeOut = conf.getTimeDuration(
+        OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_KEY,
+        OZONE_RECON_SCM_CLIENT_MAX_RETRY_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    int scmClientFailOverMaxRetryCount = conf.getInt(
+        OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_KEY,
+        OZONE_RECON_SCM_CLIENT_FAILOVER_MAX_RETRY_DEFAULT);
+
+    conf.setLong(HDDS_SCM_CLIENT_RPC_TIME_OUT, scmClientRPCTimeOut);
+    conf.setLong(HDDS_SCM_CLIENT_MAX_RETRY_TIMEOUT, scmClientMaxRetryTimeOut);
+    conf.setLong(HDDS_SCM_CLIENT_FAILOVER_MAX_RETRY,
+        scmClientFailOverMaxRetryCount);
+
     this.scmStorageConfig = new ReconStorageConfig(conf, reconUtils);
     this.clusterMap = new NetworkTopologyImpl(conf);
     this.dbStore = DBStoreBuilder
@@ -283,7 +310,7 @@ public class ReconStorageContainerManagerFacade
         ScmUtils.getContainerReportConfPrefix() + ".execute.wait.threshold",
         OZONE_SCM_EVENT_REPORT_EXEC_WAIT_THRESHOLD_DEFAULT);
     List<BlockingQueue<ContainerReport>> queues
-        = ScmUtils.initContainerReportQueue(ozoneConfiguration);
+        = ReconUtils.initContainerReportQueue(ozoneConfiguration);
     List<ThreadPoolExecutor> executors
         = FixedThreadPoolWithAffinityExecutor.initializeExecutorPool(
         threadNamePrefix, queues);
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
index e37226f965..00e0a56aab 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconIncrementalContainerReportHandler.java
@@ -30,7 +30,9 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
@@ -65,18 +67,28 @@ public class TestReconIncrementalContainerReportHandler
   private HDDSLayoutVersionManager versionManager;
 
   @Test
-  public void testProcessICR() throws IOException, NodeNotFoundException {
+  public void testProcessICR()
+      throws IOException, NodeNotFoundException, TimeoutException {
 
     ContainerID containerID = ContainerID.valueOf(100L);
     DatanodeDetails datanodeDetails = randomDatanodeDetails();
     IncrementalContainerReportFromDatanode reportMock =
         mock(IncrementalContainerReportFromDatanode.class);
     when(reportMock.getDatanodeDetails()).thenReturn(datanodeDetails);
+
+    ContainerWithPipeline containerWithPipeline = getTestContainer(
+        containerID.getId(), OPEN);
+    List<ContainerWithPipeline> containerWithPipelineList = new ArrayList<>();
+    containerWithPipelineList.add(containerWithPipeline);
+    ReconContainerManager containerManager = getContainerManager();
     IncrementalContainerReportProto containerReport =
         getIncrementalContainerReportProto(containerID,
             State.OPEN,
             datanodeDetails.getUuidString());
     when(reportMock.getReport()).thenReturn(containerReport);
+    when(getContainerManager().getScmClient()
+        .getExistContainerWithPipelinesInBatch(any(
+            ArrayList.class))).thenReturn(containerWithPipelineList);
 
     final String path =
         GenericTestUtils.getTempPath(UUID.randomUUID().toString());
@@ -97,7 +109,6 @@ public class TestReconIncrementalContainerReportHandler
 
     nodeManager.register(datanodeDetails, null, null);
 
-    ReconContainerManager containerManager = getContainerManager();
     ReconIncrementalContainerReportHandler reconIcr =
         new ReconIncrementalContainerReportHandler(nodeManager,
             containerManager, SCMContext.emptyContext());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to