This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6baa27ee56 [Fix][Zeta] Fix `seatunnel.sh - j` command not working
(#9832)
6baa27ee56 is described below
commit 6baa27ee5613abafb051265c9ce2d0027413be2e
Author: limin <[email protected]>
AuthorDate: Tue Sep 9 16:50:30 2025 +0800
[Fix][Zeta] Fix `seatunnel.sh - j` command not working (#9832)
Co-authored-by: limin <[email protected]>
---
.../client/SeaTunnelEngineClusterRoleTest.java | 127 +++++++++++++++++++++
.../operation/GetJobDetailStatusOperation.java | 2 +-
2 files changed, 128 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
index a4f8ab4cc4..7ff91eca38 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelEngineClusterRoleTest.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.config.JobConfig;
@@ -37,14 +38,22 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
+import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
+import com.hazelcast.cluster.Member;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.awaitility.Awaitility.await;
@@ -353,6 +362,124 @@ public class SeaTunnelEngineClusterRoleTest {
});
}
+ @SneakyThrows
+ @Test
+ public void testWorkerIsFirstMemberThenGetJobDetailStatus() {
+ HazelcastInstanceImpl workerNode1 = null;
+ HazelcastInstanceImpl workerNode2 = null;
+ HazelcastInstanceImpl masterNode1 = null;
+ HazelcastInstanceImpl masterNode2 = null;
+ SeaTunnelClient seatunnelClient = null;
+ HazelcastClientInstanceImpl hazelcastClient = null;
+ String testClusterName =
"Test_testWorkerIsFirstMemberThenGetJobDetailStatus";
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig
+ .getHazelcastConfig()
+
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));
+ try {
+ // master node must start first in ci
+ masterNode1 =
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig);
+ HazelcastInstanceImpl finalMasterNode1 = masterNode1;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 1,
finalMasterNode1.getCluster().getMembers().size()));
+ // start two worker nodes
+ workerNode1 =
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+ workerNode2 =
SeaTunnelServerStarter.createWorkerHazelcastInstance(seaTunnelConfig);
+ // start another master node
+ SeaTunnelConfig seaTunnelConfig2 =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig2
+ .getHazelcastConfig()
+
.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));
+ masterNode2 =
SeaTunnelServerStarter.createMasterHazelcastInstance(seaTunnelConfig2);
+ HazelcastInstanceImpl finalWorkerNode = workerNode1;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 4,
finalWorkerNode.getCluster().getMembers().size()));
+ masterNode1.shutdown();
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 3,
finalWorkerNode.getCluster().getMembers().size()));
+ Set<Member> members = workerNode1.getCluster().getMembers();
+ Map<UUID, Member> memberMap =
+ members.stream()
+ .collect(
+ Collectors.toMap(
+ Member::getUuid, member -> member,
(a, b) -> b));
+ // get master member
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(ContentFormatUtilTest.getClusterName(testClusterName));
+ hazelcastClient =
+ ((HazelcastClientProxy)
HazelcastClient.newHazelcastClient(clientConfig))
+ .client;
+ HazelcastClientInstanceImpl finalHazelcastClient = hazelcastClient;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ UUID masterUuid =
+ finalHazelcastClient
+ .getClientClusterService()
+ .getMasterMember()
+ .getUuid();
+
Assertions.assertTrue(memberMap.get(masterUuid).isLiteMember());
+ });
+ // start client job
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
ContentFormatUtilTest.getResource("/streaming_fake_to_console.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("testGetJobState");
+ seatunnelClient = createSeaTunnelClient(testClusterName);
+ JobClient jobClient = seatunnelClient.getJobClient();
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ seatunnelClient.createExecutionContext(filePath,
jobConfig, seaTunnelConfig);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ long jobId = clientJobProxy.getJobId();
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+
jobClient.getJobDetailStatus(jobId).contains("RUNNING")
+ && jobClient
+
.listJobStatus(true)
+
.contains("RUNNING")));
+ jobClient.cancelJob(jobId);
+ await().atMost(30000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ "CANCELED",
jobClient.getJobStatus(jobId)));
+ } finally {
+ if (hazelcastClient != null) {
+ hazelcastClient.shutdown();
+ }
+ if (seatunnelClient != null) {
+ seatunnelClient.close();
+ }
+ if (workerNode1 != null) {
+ workerNode1.shutdown();
+ }
+ if (workerNode2 != null) {
+ workerNode2.shutdown();
+ }
+ if (masterNode1 != null) {
+ masterNode1.shutdown();
+ }
+ if (masterNode2 != null) {
+ masterNode2.shutdown();
+ }
+ }
+ }
+
private String getMulticastConfig() {
return "hazelcast:\n"
+ " network:\n"
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
index 90700888b5..19ba448957 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobDetailStatusOperation.java
@@ -50,7 +50,7 @@ public class GetJobDetailStatusOperation extends Operation
@Override
public int getClassId() {
- return
ClientToServerOperationDataSerializerHook.PRINT_MESSAGE_OPERATOR;
+ return
ClientToServerOperationDataSerializerHook.GET_JOB_STATE_OPERATION;
}
@Override