This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 39d7a409df [Improve][Zeta] Make sure the local mode CompletableFuture
behavior same as server mode (#8476)
39d7a409df is described below
commit 39d7a409df0be2ed5d1efd061425caa57c483acd
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 9 20:37:11 2025 +0800
[Improve][Zeta] Make sure the local mode CompletableFuture behavior same as
server mode (#8476)
---
.../core/starter/seatunnel/command/ClientExecuteCommand.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index ced60e6426..bb7fe67a87 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
@@ -47,6 +48,7 @@ import org.apache.commons.lang3.StringUtils;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.internal.util.ConcurrencyUtil;
import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
@@ -55,7 +57,6 @@ import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -276,6 +277,10 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
// set local mode
seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
seaTunnelConfig.getHazelcastConfig().getNetworkConfig().setPortAutoIncrement(true);
+
+ // set the default async executor for Hazelcast InvocationFuture
+ ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);
+
return HazelcastInstanceFactory.newHazelcastInstance(
seaTunnelConfig.getHazelcastConfig(),
Thread.currentThread().getName(),