This is an automated email from the ASF dual-hosted git repository.

maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b2d060ac2 [MINOR] fix(spark-client) Close coordinator client in time 
(#2328)
b2d060ac2 is described below

commit b2d060ac2d2b3bf60e5a9cda47b3d8d1e47c5de6
Author: maobaolong <[email protected]>
AuthorDate: Tue Jan 7 14:37:51 2025 +0800

    [MINOR] fix(spark-client) Close coordinator client in time (#2328)
    
    ### What changes were proposed in this pull request?
    
    Close coordinator client in time.
    
    ### Why are the changes needed?
    
    Grpc retry will throw exception for all the time.
    
    ```
    [16:44:57:437] [grpc-default-executor-0] WARN  
org.apache.uniffle.shaded.io.grpc.internal.ManagedChannelImpl.handleErrorInSyncContext:1866
 - [Channel<1>: (a:10000)] Failed to resolve name. 
status=Status{code=UNAVAILABLE, description=Unable to resolve host a, 
cause=java.lang.RuntimeException: java.net.UnknownHostException: a: 未知的名称或服务
        at 
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:223)
        at 
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver.doResolve(DnsNameResolver.java:282)
        at 
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:318)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
    Caused by: java.net.UnknownHostException: a: 未知的名称或服务
        at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
        at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:867)
        at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1302)
        at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:815)
        at java.net.InetAddress.getAllByName0(InetAddress.java:1291)
        at java.net.InetAddress.getAllByName(InetAddress.java:1144)
        at java.net.InetAddress.getAllByName(InetAddress.java:1065)
        at 
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver$JdkAddressResolver.resolveAddress(DnsNameResolver.java:632)
        at 
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:219)
        ... 5 more
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../shuffle/DelegationRssShuffleManagerUtils.java  | 39 ++++++++++++++++++++++
 .../apache/spark/shuffle/RssSparkShuffleUtils.java | 25 +++++++++++---
 .../spark/shuffle/DelegationRssShuffleManager.java | 32 ++++++++----------
 .../shuffle/DelegationRssShuffleManagerTest.java   | 10 +++---
 .../spark/shuffle/DelegationRssShuffleManager.java | 34 +++++++------------
 .../spark/shuffle/RssShuffleManagerTestBase.java   |  2 +-
 .../uniffle/client/api/CoordinatorClient.java      |  2 +-
 .../client/factory/CoordinatorClientFactory.java   |  6 ++++
 .../impl/grpc/CoordinatorGrpcRetryableClient.java  | 14 ++++++--
 9 files changed, 108 insertions(+), 56 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManagerUtils.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManagerUtils.java
new file mode 100644
index 000000000..00c227b4e
--- /dev/null
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManagerUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DelegationRssShuffleManagerUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DelegationRssShuffleManagerUtils.class);
+
+  public static String acquireAccessId(SparkConf sparkConf) {
+    String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"").trim();
+    if (StringUtils.isEmpty(accessId)) {
+      String providerKey = 
sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
+      if (StringUtils.isNotEmpty(providerKey)) {
+        accessId = sparkConf.get(providerKey, "");
+        LOG.info("Get access id {} from provider key: {}", accessId, 
providerKey);
+      }
+    }
+    return accessId;
+  }
+}
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index e47a655c7..91ce4de50 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -103,15 +103,30 @@ public class RssSparkShuffleUtils {
     return instance;
   }
 
-  public static CoordinatorGrpcRetryableClient 
createCoordinatorClients(SparkConf sparkConf) {
+  public static CoordinatorGrpcRetryableClient 
createCoordinatorClientsWithoutHeartbeat(
+      SparkConf sparkConf) {
     String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
-    String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
+    String coordinators = getCoordinatorQuorumStr(sparkConf);
     long retryIntervalMs = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
     int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
-    int heartbeatThread = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
     CoordinatorClientFactory coordinatorClientFactory = 
CoordinatorClientFactory.getInstance();
-    return coordinatorClientFactory.createCoordinatorClient(
-        ClientType.valueOf(clientType), coordinators, retryIntervalMs, 
retryTimes, heartbeatThread);
+    return coordinatorClientFactory.createCoordinatorClientWithoutHeartbeat(
+        ClientType.valueOf(clientType), coordinators, retryIntervalMs, 
retryTimes);
+  }
+
+  public static CoordinatorGrpcRetryableClient 
createCoordinatorClientsForAccessCluster(
+      SparkConf sparkConf) {
+    String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
+    String coordinators = getCoordinatorQuorumStr(sparkConf);
+    long retryIntervalMs = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
+    int retryTimes = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
+    CoordinatorClientFactory coordinatorClientFactory = 
CoordinatorClientFactory.getInstance();
+    return coordinatorClientFactory.createCoordinatorClientWithoutHeartbeat(
+        ClientType.valueOf(clientType), coordinators, retryIntervalMs, 
retryTimes);
+  }
+
+  public static String getCoordinatorQuorumStr(SparkConf sparkConf) {
+    return sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
   }
 
   public static void applyDynamicClientConf(SparkConf sparkConf, Map<String, 
String> confItems) {
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 7f2af3ef2..3e74b3c48 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -30,7 +29,7 @@ import org.apache.spark.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
+import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.common.config.RssClientConf;
@@ -46,7 +45,6 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(DelegationRssShuffleManager.class);
 
   private final ShuffleManager delegate;
-  private final CoordinatorGrpcRetryableClient coordinatorClient;
   private final int accessTimeoutMs;
   private final SparkConf sparkConf;
   private String user;
@@ -58,10 +56,11 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     this.sparkConf = sparkConf;
     accessTimeoutMs = sparkConf.get(RssSparkConfig.RSS_ACCESS_TIMEOUT_MS);
     if (isDriver) {
-      coordinatorClient = 
RssSparkShuffleUtils.createCoordinatorClients(sparkConf);
-      delegate = createShuffleManagerInDriver();
+      try (CoordinatorClient coordinatorClient =
+          
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(sparkConf)) {
+        delegate = createShuffleManagerInDriver(coordinatorClient);
+      }
     } else {
-      coordinatorClient = null;
       delegate = createShuffleManagerInExecutor();
     }
 
@@ -70,7 +69,8 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     }
   }
 
-  private ShuffleManager createShuffleManagerInDriver() throws RssException {
+  private ShuffleManager createShuffleManagerInDriver(CoordinatorClient 
coordinatorClient)
+      throws RssException {
     ShuffleManager shuffleManager;
     user = "user";
     try {
@@ -78,7 +78,7 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     } catch (Exception e) {
       LOG.error("Error on getting user from ugi." + e);
     }
-    boolean canAccess = tryAccessCluster();
+    boolean canAccess = tryAccessCluster(coordinatorClient);
     if (uuid == null || "".equals(uuid)) {
       uuid = String.valueOf(System.currentTimeMillis());
     }
@@ -112,14 +112,11 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     return shuffleManager;
   }
 
-  private boolean tryAccessCluster() {
-    String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"").trim();
-    if (StringUtils.isEmpty(accessId)) {
-      String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"");
-      if (StringUtils.isNotEmpty(accessId)) {
-        accessId = sparkConf.get(providerKey, "");
-        LOG.info("Get access id {} from provider key: {}", accessId, 
providerKey);
-      }
+  private boolean tryAccessCluster(CoordinatorClient coordinatorClient) {
+    String accessId = 
DelegationRssShuffleManagerUtils.acquireAccessId(sparkConf);
+    if (accessId == null) {
+      LOG.warn("Access id key is null");
+      return false;
     }
     long retryInterval = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
     int retryTimes = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
@@ -225,9 +222,6 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   @Override
   public void stop() {
     delegate.stop();
-    if (coordinatorClient != null) {
-      coordinatorClient.close();
-    }
   }
 
   @Override
diff --git 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 05586cf4e..9f7e6e848 100644
--- 
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ 
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -66,7 +66,7 @@ public class DelegationRssShuffleManagerTest {
     List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
     coordinatorClients.add(mockCoordinatorClient);
     mockedStaticRssShuffleUtils
-        .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+        .when(() -> 
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
         .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 
0, 1));
     SparkConf conf = new SparkConf();
     conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
@@ -81,7 +81,7 @@ public class DelegationRssShuffleManagerTest {
     List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
     coordinatorClients.add(mockCoordinatorClient);
     mockedStaticRssShuffleUtils
-        .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+        .when(() -> 
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
         .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 
0, 1));
 
     SparkConf conf = new SparkConf();
@@ -119,7 +119,7 @@ public class DelegationRssShuffleManagerTest {
     List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
     coordinatorClients.add(mockCoordinatorClient);
     mockedStaticRssShuffleUtils
-        .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+        .when(() -> 
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
         .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 
0, 1));
 
     SparkConf conf = new SparkConf();
@@ -153,7 +153,7 @@ public class DelegationRssShuffleManagerTest {
     List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
     coordinatorClients.add(mockDeniedCoordinatorClient);
     mockedStaticRssShuffleUtils
-        .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+        .when(() -> 
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
         .thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 
0, 1));
     SparkConf conf = new SparkConf();
     conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
@@ -173,7 +173,7 @@ public class DelegationRssShuffleManagerTest {
     List<CoordinatorClient> secondCoordinatorClients = Lists.newArrayList();
     secondCoordinatorClients.add(mockCoordinatorClient);
     mockedStaticRssShuffleUtils
-        .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+        .when(() -> 
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
         .thenReturn(new 
CoordinatorGrpcRetryableClient(secondCoordinatorClients, 0, 0, 1));
     SparkConf secondConf = new SparkConf();
     secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 593c8a29a..a2e14456b 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -30,7 +29,7 @@ import org.apache.spark.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
+import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.request.RssAccessClusterRequest;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.common.config.RssClientConf;
@@ -46,7 +45,6 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(DelegationRssShuffleManager.class);
 
   private final ShuffleManager delegate;
-  private final CoordinatorGrpcRetryableClient coordinatorClient;
   private final int accessTimeoutMs;
   private final SparkConf sparkConf;
   private String user;
@@ -58,10 +56,11 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     this.sparkConf = sparkConf;
     accessTimeoutMs = sparkConf.get(RssSparkConfig.RSS_ACCESS_TIMEOUT_MS);
     if (isDriver) {
-      coordinatorClient = 
RssSparkShuffleUtils.createCoordinatorClients(sparkConf);
-      delegate = createShuffleManagerInDriver();
+      try (CoordinatorClient coordinatorClient =
+          
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(sparkConf)) {
+        delegate = createShuffleManagerInDriver(coordinatorClient);
+      }
     } else {
-      coordinatorClient = null;
       delegate = createShuffleManagerInExecutor();
     }
 
@@ -70,7 +69,8 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     }
   }
 
-  private ShuffleManager createShuffleManagerInDriver() throws RssException {
+  private ShuffleManager createShuffleManagerInDriver(CoordinatorClient 
coordinatorClient)
+      throws RssException {
     ShuffleManager shuffleManager;
     user = "user";
     try {
@@ -78,7 +78,7 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     } catch (Exception e) {
       LOG.error("Error on getting user from ugi." + e);
     }
-    boolean canAccess = tryAccessCluster();
+    boolean canAccess = tryAccessCluster(coordinatorClient);
     if (uuid == null || "".equals(uuid)) {
       uuid = String.valueOf(System.currentTimeMillis());
     }
@@ -112,17 +112,10 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
     return shuffleManager;
   }
 
-  private boolean tryAccessCluster() {
-    String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(), 
"").trim();
-    if (StringUtils.isEmpty(accessId)) {
-      String providerKey = 
sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
-      if (StringUtils.isNotEmpty(accessId)) {
-        accessId = sparkConf.get(providerKey, "");
-        LOG.info("Get access id {} from provider key: {}", accessId, 
providerKey);
-      }
-    }
-    if (StringUtils.isEmpty(accessId)) {
-      LOG.warn("Access id key is empty");
+  private boolean tryAccessCluster(CoordinatorClient coordinatorClient) {
+    String accessId = 
DelegationRssShuffleManagerUtils.acquireAccessId(sparkConf);
+    if (accessId == null) {
+      LOG.warn("Access id key is null");
       return false;
     }
     long retryInterval = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
@@ -311,9 +304,6 @@ public class DelegationRssShuffleManager implements 
ShuffleManager {
   @Override
   public void stop() {
     delegate.stop();
-    if (coordinatorClient != null) {
-      coordinatorClient.close();
-    }
   }
 
   @Override
diff --git 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
index 4b7cb6202..0cfcffbed 100644
--- 
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
+++ 
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
@@ -63,7 +63,7 @@ public class RssShuffleManagerTestBase {
     CoordinatorGrpcRetryableClient client =
         new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 1, 1);
     mockedStaticRssShuffleUtils
-        .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+        .when(() -> 
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
         .thenReturn(client);
     return mockCoordinatorClient;
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
index 33ed9c8c7..410ea0681 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
@@ -32,7 +32,7 @@ import 
org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
 import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
 import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
 
-public interface CoordinatorClient {
+public interface CoordinatorClient extends AutoCloseable {
 
   RssAppHeartBeatResponse 
scheduleAtFixedRateToSendAppHeartBeat(RssAppHeartBeatRequest request);
 
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index e652a31ce..d9ef4e94f 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -100,4 +100,10 @@ public class CoordinatorClientFactory {
     return new CoordinatorGrpcRetryableClient(
         coordinatorClients, retryIntervalMs, retryTimes, heartBeatThreadNum);
   }
+
+  public synchronized CoordinatorGrpcRetryableClient 
createCoordinatorClientWithoutHeartbeat(
+      ClientType clientType, String coordinators, long retryIntervalMs, int 
retryTimes) {
+    List<CoordinatorClient> coordinatorClients = 
createCoordinatorClient(clientType, coordinators);
+    return new CoordinatorGrpcRetryableClient(coordinatorClients, 
retryIntervalMs, retryTimes, 0);
+  }
 }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
index 418575fd4..ef8a0f7ff 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +62,10 @@ public class CoordinatorGrpcRetryableClient implements 
CoordinatorClient {
     this.coordinatorClients = coordinatorClients;
     this.retryIntervalMs = retryIntervalMs;
     this.retryTimes = retryTimes;
-    this.heartBeatExecutorService =
-        ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum, 
"client-heartbeat");
+    if (heartBeatThreadNum > 0) {
+      this.heartBeatExecutorService =
+          ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum, 
"client-heartbeat");
+    }
   }
 
   @Override
@@ -70,6 +73,7 @@ public class CoordinatorGrpcRetryableClient implements 
CoordinatorClient {
       RssAppHeartBeatRequest request) {
     AtomicReference<RssAppHeartBeatResponse> rssResponse = new 
AtomicReference<>();
     rssResponse.set(new RssAppHeartBeatResponse(StatusCode.INTERNAL_ERROR));
+    Preconditions.checkNotNull(heartBeatExecutorService);
     ThreadUtils.executeTasks(
         heartBeatExecutorService,
         coordinatorClients,
@@ -97,6 +101,7 @@ public class CoordinatorGrpcRetryableClient implements 
CoordinatorClient {
   public RssApplicationInfoResponse 
registerApplicationInfo(RssApplicationInfoRequest request) {
     AtomicReference<RssApplicationInfoResponse> rssResponse = new 
AtomicReference<>();
     rssResponse.set(new RssApplicationInfoResponse(StatusCode.INTERNAL_ERROR));
+    Preconditions.checkNotNull(heartBeatExecutorService);
     ThreadUtils.executeTasks(
         heartBeatExecutorService,
         coordinatorClients,
@@ -124,6 +129,7 @@ public class CoordinatorGrpcRetryableClient implements 
CoordinatorClient {
   @Override
   public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest 
request) {
     AtomicBoolean sendSuccessfully = new AtomicBoolean(false);
+    Preconditions.checkNotNull(heartBeatExecutorService);
     ThreadUtils.executeTasks(
         heartBeatExecutorService,
         coordinatorClients,
@@ -280,7 +286,9 @@ public class CoordinatorGrpcRetryableClient implements 
CoordinatorClient {
 
   @Override
   public void close() {
-    heartBeatExecutorService.shutdownNow();
+    if (heartBeatExecutorService != null) {
+      heartBeatExecutorService.shutdownNow();
+    }
     coordinatorClients.forEach(CoordinatorClient::close);
   }
 }

Reply via email to