This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6baa27ee56 [Fix][Zeta] Fix `seatunnel.sh - j` command not working 
(#9832)
6baa27ee56 is described below

commit 6baa27ee5613abafb051265c9ce2d0027413be2e
Author: limin <[email protected]>
AuthorDate: Tue Sep 9 16:50:30 2025 +0800

    [Fix][Zeta] Fix `seatunnel.sh - j` command not working (#9832)
    
    Co-authored-by: limin <[email protected]>
---
 .../client/SeaTunnelEngineClusterRoleTest.java     | 127 +++++++++++++++++++++
 .../operation/GetJobDetailStatusOperation.java     |   2 +-
 2 files changed, 128 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index a4f8ab4cc4..7ff91eca38 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -37,14 +38,22 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
+import com.hazelcast.client.HazelcastClient;
 import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
+import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
+import com.hazelcast.cluster.Member;
 import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.awaitility.Awaitility.await;
 
@@ -353,6 +362,124 @@ public class SeaTunnelEngineClusterRoleTest {
                 });
     }
 
+    @SneakyThrows
+    @Test
+    public void testWorkerIsFirstMemberThenGetJobDetailStatus() {
+        HazelcastInstanceImpl workerNode1 = null;
+        HazelcastInstanceImpl workerNode2 = null;
+        HazelcastInstanceImpl masterNode1 = null;
+        HazelcastInstanceImpl masterNode2 = null;
+        SeaTunnelClient seatunnelClient = null;
+        HazelcastClientInstanceImpl hazelcastClient = null;
+        String testClusterName = 
"Test_testWorkerIsFirstMemberThenGetJobDetailStatus";
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig
+                .getHazelcastConfig()
+                
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));
+        try {
+            // master node must start first in ci
+            masterNode1 = 
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+            HazelcastInstanceImpl finalMasterNode1 = masterNode1;
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            1, 
finalMasterNode1.getCluster().getMembers().size()));
+            // start two worker nodes
+            workerNode1 = 
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+            workerNode2 = 
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+            // start another master node
+            SeaTunnelConfig seaTunnelConfig2 = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+            seaTunnelConfig2
+                    .getHazelcastConfig()
+                    
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));
+            masterNode2 = 
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig2);
+            HazelcastInstanceImpl finalWorkerNode = workerNode1;
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            4, 
finalWorkerNode.getCluster().getMembers().size()));
+            masterNode1.shutdown();
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            3, 
finalWorkerNode.getCluster().getMembers().size()));
+            Set<Member> members = workerNode1.getCluster().getMembers();
+            Map<UUID, Member> memberMap =
+                    members.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            Member::getUuid, member -> member, 
(a, b) -> b));
+            // get master member
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            
clientConfig.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));
+            hazelcastClient =
+                    ((HazelcastClientProxy) 
HazelcastClient.newHazelcastClient(clientConfig))
+                            .client;
+            HazelcastClientInstanceImpl finalHazelcastClient = hazelcastClient;
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                UUID masterUuid =
+                                        finalHazelcastClient
+                                                .getClientClusterService()
+                                                .getMasterMember()
+                                                .getUuid();
+                                
Assertions.assertTrue(memberMap.get(masterUuid).isLiteMember());
+                            });
+            // start client job
+            Common.setDeployMode(DeployMode.CLIENT);
+            String filePath = 
ContentFormatUtilTest.getResource("/streaming_fake_to_console.conf");
+            JobConfig jobConfig = new JobConfig();
+            jobConfig.setName("testGetJobState");
+            seatunnelClient = createSeaTunnelClient(testClusterName);
+            JobClient jobClient = seatunnelClient.getJobClient();
+            ClientJobExecutionEnvironment jobExecutionEnv =
+                    seatunnelClient.createExecutionContext(filePath, 
jobConfig, seaTunnelConfig);
+            final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+            long jobId = clientJobProxy.getJobId();
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertTrue(
+                                            
jobClient.getJobDetailStatus(jobId).contains("RUNNING")
+                                                    && jobClient
+                                                            
.listJobStatus(true)
+                                                            
.contains("RUNNING")));
+            jobClient.cancelJob(jobId);
+            await().atMost(30000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            "CANCELED", 
jobClient.getJobStatus(jobId)));
+        } finally {
+            if (hazelcastClient != null) {
+                hazelcastClient.shutdown();
+            }
+            if (seatunnelClient != null) {
+                seatunnelClient.close();
+            }
+            if (workerNode1 != null) {
+                workerNode1.shutdown();
+            }
+            if (workerNode2 != null) {
+                workerNode2.shutdown();
+            }
+            if (masterNode1 != null) {
+                masterNode1.shutdown();
+            }
+            if (masterNode2 != null) {
+                masterNode2.shutdown();
+            }
+        }
+    }
+
     private String getMulticastConfig() {
         return "hazelcast:\n"
                 + "  network:\n"
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
index 90700888b5..19ba448957 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
@@ -50,7 +50,7 @@ public class GetJobDetailStatusOperation extends Operation
 
     @Override
     public int getClassId() {
-        return 
ClientToServerOperationDataSerializerHook.PRINT_MESSAGE_OPERATOR;
+        return 
ClientToServerOperationDataSerializerHook.GET_JOB_STATE_OPERATION;
     }
 
     @Override

Reply via email to