This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 39d7a409df [Improve][Zeta] Make sure the local mode CompletableFuture 
behavior same as server mode (#8476)
39d7a409df is described below

commit 39d7a409df0be2ed5d1efd061425caa57c483acd
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 9 20:37:11 2025 +0800

    [Improve][Zeta] Make sure the local mode CompletableFuture behavior same as 
server mode (#8476)
---
 .../core/starter/seatunnel/command/ClientExecuteCommand.java       | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index ced60e6426..bb7fe67a87 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -38,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.concurrent.CompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobResult;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
@@ -47,6 +48,7 @@ import org.apache.commons.lang3.StringUtils;
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import com.hazelcast.internal.util.ConcurrencyUtil;
 import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
@@ -55,7 +57,6 @@ import java.time.LocalDateTime;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -276,6 +277,10 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
         // set local mode
         seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
         
seaTunnelConfig.getHazelcastConfig().getNetworkConfig().setPortAutoIncrement(true);
+
+        // set the default async executor for Hazelcast InvocationFuture
+        ConcurrencyUtil.setDefaultAsyncExecutor(CompletableFuture.EXECUTOR);
+
         return HazelcastInstanceFactory.newHazelcastInstance(
                 seaTunnelConfig.getHazelcastConfig(),
                 Thread.currentThread().getName(),

Reply via email to