This is an automated email from the ASF dual-hosted git repository.
maobaolong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b2d060ac2 [MINOR] fix(spark-client) Close coordinator client in time
(#2328)
b2d060ac2 is described below
commit b2d060ac2d2b3bf60e5a9cda47b3d8d1e47c5de6
Author: maobaolong <[email protected]>
AuthorDate: Tue Jan 7 14:37:51 2025 +0800
[MINOR] fix(spark-client) Close coordinator client in time (#2328)
### What changes were proposed in this pull request?
Close coordinator client in time.
### Why are the changes needed?
Grpc retry will throw exception for all the time.
```
[16:44:57:437] [grpc-default-executor-0] WARN
org.apache.uniffle.shaded.io.grpc.internal.ManagedChannelImpl.handleErrorInSyncContext:1866
- [Channel<1>: (a:10000)] Failed to resolve name.
status=Status{code=UNAVAILABLE, description=Unable to resolve host a,
cause=java.lang.RuntimeException: java.net.UnknownHostException: a: 未知的名称或服务
at
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:223)
at
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver.doResolve(DnsNameResolver.java:282)
at
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:318)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.UnknownHostException: a: 未知的名称或服务
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:867)
at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1302)
at java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:815)
at java.net.InetAddress.getAllByName0(InetAddress.java:1291)
at java.net.InetAddress.getAllByName(InetAddress.java:1144)
at java.net.InetAddress.getAllByName(InetAddress.java:1065)
at
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver$JdkAddressResolver.resolveAddress(DnsNameResolver.java:632)
at
org.apache.uniffle.shaded.io.grpc.internal.DnsNameResolver.resolveAddresses(DnsNameResolver.java:219)
... 5 more
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
---
.../shuffle/DelegationRssShuffleManagerUtils.java | 39 ++++++++++++++++++++++
.../apache/spark/shuffle/RssSparkShuffleUtils.java | 25 +++++++++++---
.../spark/shuffle/DelegationRssShuffleManager.java | 32 ++++++++----------
.../shuffle/DelegationRssShuffleManagerTest.java | 10 +++---
.../spark/shuffle/DelegationRssShuffleManager.java | 34 +++++++------------
.../spark/shuffle/RssShuffleManagerTestBase.java | 2 +-
.../uniffle/client/api/CoordinatorClient.java | 2 +-
.../client/factory/CoordinatorClientFactory.java | 6 ++++
.../impl/grpc/CoordinatorGrpcRetryableClient.java | 14 ++++++--
9 files changed, 108 insertions(+), 56 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManagerUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManagerUtils.java
new file mode 100644
index 000000000..00c227b4e
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManagerUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DelegationRssShuffleManagerUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(DelegationRssShuffleManagerUtils.class);
+
+ public static String acquireAccessId(SparkConf sparkConf) {
+ String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"").trim();
+ if (StringUtils.isEmpty(accessId)) {
+ String providerKey =
sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
+ if (StringUtils.isNotEmpty(providerKey)) {
+ accessId = sparkConf.get(providerKey, "");
+ LOG.info("Get access id {} from provider key: {}", accessId,
providerKey);
+ }
+ }
+ return accessId;
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index e47a655c7..91ce4de50 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -103,15 +103,30 @@ public class RssSparkShuffleUtils {
return instance;
}
- public static CoordinatorGrpcRetryableClient
createCoordinatorClients(SparkConf sparkConf) {
+ public static CoordinatorGrpcRetryableClient
createCoordinatorClientsWithoutHeartbeat(
+ SparkConf sparkConf) {
String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
- String coordinators = sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
+ String coordinators = getCoordinatorQuorumStr(sparkConf);
long retryIntervalMs =
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
- int heartbeatThread =
sparkConf.get(RssSparkConfig.RSS_CLIENT_HEARTBEAT_THREAD_NUM);
CoordinatorClientFactory coordinatorClientFactory =
CoordinatorClientFactory.getInstance();
- return coordinatorClientFactory.createCoordinatorClient(
- ClientType.valueOf(clientType), coordinators, retryIntervalMs,
retryTimes, heartbeatThread);
+ return coordinatorClientFactory.createCoordinatorClientWithoutHeartbeat(
+ ClientType.valueOf(clientType), coordinators, retryIntervalMs,
retryTimes);
+ }
+
+ public static CoordinatorGrpcRetryableClient
createCoordinatorClientsForAccessCluster(
+ SparkConf sparkConf) {
+ String clientType = sparkConf.get(RssSparkConfig.RSS_CLIENT_TYPE);
+ String coordinators = getCoordinatorQuorumStr(sparkConf);
+ long retryIntervalMs =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
+ int retryTimes =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
+ CoordinatorClientFactory coordinatorClientFactory =
CoordinatorClientFactory.getInstance();
+ return coordinatorClientFactory.createCoordinatorClientWithoutHeartbeat(
+ ClientType.valueOf(clientType), coordinators, retryIntervalMs,
retryTimes);
+ }
+
+ public static String getCoordinatorQuorumStr(SparkConf sparkConf) {
+ return sparkConf.get(RssSparkConfig.RSS_COORDINATOR_QUORUM);
}
public static void applyDynamicClientConf(SparkConf sparkConf, Map<String,
String> confItems) {
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 7f2af3ef2..3e74b3c48 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -30,7 +29,7 @@ import org.apache.spark.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
+import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.config.RssClientConf;
@@ -46,7 +45,6 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private static final Logger LOG =
LoggerFactory.getLogger(DelegationRssShuffleManager.class);
private final ShuffleManager delegate;
- private final CoordinatorGrpcRetryableClient coordinatorClient;
private final int accessTimeoutMs;
private final SparkConf sparkConf;
private String user;
@@ -58,10 +56,11 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
this.sparkConf = sparkConf;
accessTimeoutMs = sparkConf.get(RssSparkConfig.RSS_ACCESS_TIMEOUT_MS);
if (isDriver) {
- coordinatorClient =
RssSparkShuffleUtils.createCoordinatorClients(sparkConf);
- delegate = createShuffleManagerInDriver();
+ try (CoordinatorClient coordinatorClient =
+
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(sparkConf)) {
+ delegate = createShuffleManagerInDriver(coordinatorClient);
+ }
} else {
- coordinatorClient = null;
delegate = createShuffleManagerInExecutor();
}
@@ -70,7 +69,8 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
}
}
- private ShuffleManager createShuffleManagerInDriver() throws RssException {
+ private ShuffleManager createShuffleManagerInDriver(CoordinatorClient
coordinatorClient)
+ throws RssException {
ShuffleManager shuffleManager;
user = "user";
try {
@@ -78,7 +78,7 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
} catch (Exception e) {
LOG.error("Error on getting user from ugi." + e);
}
- boolean canAccess = tryAccessCluster();
+ boolean canAccess = tryAccessCluster(coordinatorClient);
if (uuid == null || "".equals(uuid)) {
uuid = String.valueOf(System.currentTimeMillis());
}
@@ -112,14 +112,11 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
return shuffleManager;
}
- private boolean tryAccessCluster() {
- String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"").trim();
- if (StringUtils.isEmpty(accessId)) {
- String providerKey = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"");
- if (StringUtils.isNotEmpty(accessId)) {
- accessId = sparkConf.get(providerKey, "");
- LOG.info("Get access id {} from provider key: {}", accessId,
providerKey);
- }
+ private boolean tryAccessCluster(CoordinatorClient coordinatorClient) {
+ String accessId =
DelegationRssShuffleManagerUtils.acquireAccessId(sparkConf);
+ if (accessId == null) {
+ LOG.warn("Access id key is null");
+ return false;
}
long retryInterval =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
int retryTimes =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
@@ -225,9 +222,6 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
@Override
public void stop() {
delegate.stop();
- if (coordinatorClient != null) {
- coordinatorClient.close();
- }
}
@Override
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 05586cf4e..9f7e6e848 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -66,7 +66,7 @@ public class DelegationRssShuffleManagerTest {
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
coordinatorClients.add(mockCoordinatorClient);
mockedStaticRssShuffleUtils
- .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+ .when(() ->
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
.thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0,
0, 1));
SparkConf conf = new SparkConf();
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
@@ -81,7 +81,7 @@ public class DelegationRssShuffleManagerTest {
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
coordinatorClients.add(mockCoordinatorClient);
mockedStaticRssShuffleUtils
- .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+ .when(() ->
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
.thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0,
0, 1));
SparkConf conf = new SparkConf();
@@ -119,7 +119,7 @@ public class DelegationRssShuffleManagerTest {
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
coordinatorClients.add(mockCoordinatorClient);
mockedStaticRssShuffleUtils
- .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+ .when(() ->
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
.thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0,
0, 1));
SparkConf conf = new SparkConf();
@@ -153,7 +153,7 @@ public class DelegationRssShuffleManagerTest {
List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
coordinatorClients.add(mockDeniedCoordinatorClient);
mockedStaticRssShuffleUtils
- .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+ .when(() ->
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
.thenReturn(new CoordinatorGrpcRetryableClient(coordinatorClients, 0,
0, 1));
SparkConf conf = new SparkConf();
conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
@@ -173,7 +173,7 @@ public class DelegationRssShuffleManagerTest {
List<CoordinatorClient> secondCoordinatorClients = Lists.newArrayList();
secondCoordinatorClients.add(mockCoordinatorClient);
mockedStaticRssShuffleUtils
- .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+ .when(() ->
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
.thenReturn(new
CoordinatorGrpcRetryableClient(secondCoordinatorClients, 0, 0, 1));
SparkConf secondConf = new SparkConf();
secondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 593c8a29a..a2e14456b 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -22,7 +22,6 @@ import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
@@ -30,7 +29,7 @@ import org.apache.spark.TaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.client.impl.grpc.CoordinatorGrpcRetryableClient;
+import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.config.RssClientConf;
@@ -46,7 +45,6 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
private static final Logger LOG =
LoggerFactory.getLogger(DelegationRssShuffleManager.class);
private final ShuffleManager delegate;
- private final CoordinatorGrpcRetryableClient coordinatorClient;
private final int accessTimeoutMs;
private final SparkConf sparkConf;
private String user;
@@ -58,10 +56,11 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
this.sparkConf = sparkConf;
accessTimeoutMs = sparkConf.get(RssSparkConfig.RSS_ACCESS_TIMEOUT_MS);
if (isDriver) {
- coordinatorClient =
RssSparkShuffleUtils.createCoordinatorClients(sparkConf);
- delegate = createShuffleManagerInDriver();
+ try (CoordinatorClient coordinatorClient =
+
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(sparkConf)) {
+ delegate = createShuffleManagerInDriver(coordinatorClient);
+ }
} else {
- coordinatorClient = null;
delegate = createShuffleManagerInExecutor();
}
@@ -70,7 +69,8 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
}
}
- private ShuffleManager createShuffleManagerInDriver() throws RssException {
+ private ShuffleManager createShuffleManagerInDriver(CoordinatorClient
coordinatorClient)
+ throws RssException {
ShuffleManager shuffleManager;
user = "user";
try {
@@ -78,7 +78,7 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
} catch (Exception e) {
LOG.error("Error on getting user from ugi." + e);
}
- boolean canAccess = tryAccessCluster();
+ boolean canAccess = tryAccessCluster(coordinatorClient);
if (uuid == null || "".equals(uuid)) {
uuid = String.valueOf(System.currentTimeMillis());
}
@@ -112,17 +112,10 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
return shuffleManager;
}
- private boolean tryAccessCluster() {
- String accessId = sparkConf.get(RssSparkConfig.RSS_ACCESS_ID.key(),
"").trim();
- if (StringUtils.isEmpty(accessId)) {
- String providerKey =
sparkConf.get(RssSparkConfig.RSS_ACCESS_ID_PROVIDER_KEY.key(), "");
- if (StringUtils.isNotEmpty(accessId)) {
- accessId = sparkConf.get(providerKey, "");
- LOG.info("Get access id {} from provider key: {}", accessId,
providerKey);
- }
- }
- if (StringUtils.isEmpty(accessId)) {
- LOG.warn("Access id key is empty");
+ private boolean tryAccessCluster(CoordinatorClient coordinatorClient) {
+ String accessId =
DelegationRssShuffleManagerUtils.acquireAccessId(sparkConf);
+ if (accessId == null) {
+ LOG.warn("Access id key is null");
return false;
}
long retryInterval =
sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
@@ -311,9 +304,6 @@ public class DelegationRssShuffleManager implements
ShuffleManager {
@Override
public void stop() {
delegate.stop();
- if (coordinatorClient != null) {
- coordinatorClient.close();
- }
}
@Override
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
index 4b7cb6202..0cfcffbed 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/RssShuffleManagerTestBase.java
@@ -63,7 +63,7 @@ public class RssShuffleManagerTestBase {
CoordinatorGrpcRetryableClient client =
new CoordinatorGrpcRetryableClient(coordinatorClients, 0, 1, 1);
mockedStaticRssShuffleUtils
- .when(() -> RssSparkShuffleUtils.createCoordinatorClients(any()))
+ .when(() ->
RssSparkShuffleUtils.createCoordinatorClientsForAccessCluster(any()))
.thenReturn(client);
return mockCoordinatorClient;
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
index 33ed9c8c7..410ea0681 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/api/CoordinatorClient.java
@@ -32,7 +32,7 @@ import
org.apache.uniffle.client.response.RssFetchRemoteStorageResponse;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
import org.apache.uniffle.client.response.RssSendHeartBeatResponse;
-public interface CoordinatorClient {
+public interface CoordinatorClient extends AutoCloseable {
RssAppHeartBeatResponse
scheduleAtFixedRateToSendAppHeartBeat(RssAppHeartBeatRequest request);
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
index e652a31ce..d9ef4e94f 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/factory/CoordinatorClientFactory.java
@@ -100,4 +100,10 @@ public class CoordinatorClientFactory {
return new CoordinatorGrpcRetryableClient(
coordinatorClients, retryIntervalMs, retryTimes, heartBeatThreadNum);
}
+
+ public synchronized CoordinatorGrpcRetryableClient
createCoordinatorClientWithoutHeartbeat(
+ ClientType clientType, String coordinators, long retryIntervalMs, int
retryTimes) {
+ List<CoordinatorClient> coordinatorClients =
createCoordinatorClient(clientType, coordinators);
+ return new CoordinatorGrpcRetryableClient(coordinatorClients,
retryIntervalMs, retryTimes, 0);
+ }
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
index 418575fd4..ef8a0f7ff 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/CoordinatorGrpcRetryableClient.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +62,10 @@ public class CoordinatorGrpcRetryableClient implements
CoordinatorClient {
this.coordinatorClients = coordinatorClients;
this.retryIntervalMs = retryIntervalMs;
this.retryTimes = retryTimes;
- this.heartBeatExecutorService =
- ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum,
"client-heartbeat");
+ if (heartBeatThreadNum > 0) {
+ this.heartBeatExecutorService =
+ ThreadUtils.getDaemonFixedThreadPool(heartBeatThreadNum,
"client-heartbeat");
+ }
}
@Override
@@ -70,6 +73,7 @@ public class CoordinatorGrpcRetryableClient implements
CoordinatorClient {
RssAppHeartBeatRequest request) {
AtomicReference<RssAppHeartBeatResponse> rssResponse = new
AtomicReference<>();
rssResponse.set(new RssAppHeartBeatResponse(StatusCode.INTERNAL_ERROR));
+ Preconditions.checkNotNull(heartBeatExecutorService);
ThreadUtils.executeTasks(
heartBeatExecutorService,
coordinatorClients,
@@ -97,6 +101,7 @@ public class CoordinatorGrpcRetryableClient implements
CoordinatorClient {
public RssApplicationInfoResponse
registerApplicationInfo(RssApplicationInfoRequest request) {
AtomicReference<RssApplicationInfoResponse> rssResponse = new
AtomicReference<>();
rssResponse.set(new RssApplicationInfoResponse(StatusCode.INTERNAL_ERROR));
+ Preconditions.checkNotNull(heartBeatExecutorService);
ThreadUtils.executeTasks(
heartBeatExecutorService,
coordinatorClients,
@@ -124,6 +129,7 @@ public class CoordinatorGrpcRetryableClient implements
CoordinatorClient {
@Override
public RssSendHeartBeatResponse sendHeartBeat(RssSendHeartBeatRequest
request) {
AtomicBoolean sendSuccessfully = new AtomicBoolean(false);
+ Preconditions.checkNotNull(heartBeatExecutorService);
ThreadUtils.executeTasks(
heartBeatExecutorService,
coordinatorClients,
@@ -280,7 +286,9 @@ public class CoordinatorGrpcRetryableClient implements
CoordinatorClient {
@Override
public void close() {
- heartBeatExecutorService.shutdownNow();
+ if (heartBeatExecutorService != null) {
+ heartBeatExecutorService.shutdownNow();
+ }
coordinatorClients.forEach(CoordinatorClient::close);
}
}