Copilot commented on code in PR #10364:
URL: https://github.com/apache/seatunnel/pull/10364#discussion_r2707307124
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -374,4 +402,56 @@ public void setPreApplyResourceFutures(
Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures) {
this.preApplyResourceFutures = preApplyResourceFutures;
}
+
+ /** Start next pipelines according to concurrency limit */
+ private synchronized void startNextPipelines() {
+ while (runningPipelineCount.get() < pipelineConcurrency) {
+ int nextIndex = nextPipelineIndex.getAndIncrement();
+ if (nextIndex >= pipelineList.size()) {
+ // No more pipelines to start
+ break;
+ }
+
+ SubPlan nextPipeline = pipelineList.get(nextIndex);
+ if
(PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
+ runningPipelineCount.incrementAndGet();
+ int completed = finishedPipelineNum.get();
+ int total = pipelineList.size();
+ int running = runningPipelineCount.get();
+ int queued = total - completed - running;
+
+ if (pipelineConcurrency == 1) {
+ log.info(
+ "[Serial Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ } else if (pipelineConcurrency < Integer.MAX_VALUE) {
+ log.info(
+ "[Limited Concurrency={}] Starting pipeline {}/{}:
{} | Running: {} | Queued: {} | Completed: {}",
+ pipelineConcurrency,
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ } else {
+ log.info(
+ "[Concurrent Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
Review Comment:
In unlimited concurrency mode (pipelineConcurrency = Integer.MAX_VALUE), the
while loop condition at line 408 will always be true until all pipelines are
started. Combined with the synchronized method, this means startNextPipelines()
will start ALL pipelines in a single call during the SCHEDULED state transition
(line 362). While this matches the original behavior, the logging messages at
lines 443-450 claiming "[Concurrent Mode]" will be misleading since all
pipelines start at once rather than progressively as slots become available.
The "Running", "Queued", and "Completed" counts in the log will all be zero or
incorrect at startup.
```suggestion
// In unlimited concurrency mode, all pipelines are
started in a single pass.
// Avoid logging running/queued/completed counts here to
prevent misleading output.
log.info(
"[Unlimited Concurrency Mode] Starting pipeline
{}/{}: {}",
nextIndex + 1,
total,
nextPipeline.getPipelineFullName());
```
##########
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java:
##########
@@ -51,13 +52,15 @@ public int getClassId() {
public void writeData(ObjectDataOutput out) throws IOException {
out.writeString(name);
out.writeObject(jobContext);
+ out.writeInt(pipelineConcurrency);
out.writeObject(envOptions);
}
@Override
public void readData(ObjectDataInput in) throws IOException {
this.name = in.readString();
this.jobContext = in.readObject();
+ this.pipelineConcurrency = in.readInt();
Review Comment:
The serialization format change in JobConfig introduces a backward
compatibility issue. Adding a new field (pipelineConcurrency) between existing
fields in the serialization order means that older clients/servers won't be
able to deserialize JobConfig objects from newer versions, and vice versa. This
breaks rolling upgrades and cluster compatibility. Consider: 1) Adding version
handling in the serialization logic, 2) Adding the field at the end after
envOptions, or 3) Using a versioned serialization approach with conditional
field reading/writing.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -163,6 +175,26 @@ public void addPipelineEndCallback(SubPlan subPlan) {
jobFullName));
updateJobState(JobStatus.FAILING);
}
+ } else if (PipelineStatus.FINISHED.equals(
+ pipelineState.getPipelineStatus())) {
+ // Pipeline finished successfully, decrease
running count and try to
+ // start next pipeline
+ int currentRunning =
runningPipelineCount.decrementAndGet();
+ int completed =
+ finishedPipelineNum
+ .incrementAndGet(); // Will be
incremented below
+ int total = pipelineList.size();
+ int queued = total - completed - currentRunning;
+
+ log.info(
+ "Pipeline completed: {} | Progress: {}/{}
| Running: {} | Queued: {}",
+ subPlan.getPipelineFullName(),
+ completed,
+ total,
+ currentRunning,
+ queued);
+
+ startNextPipelines();
Review Comment:
The addPipelineEndCallback method registers a callback that runs
asynchronously via thenAcceptAsync. When a pipeline finishes, it calls
startNextPipelines() at line 197. However, startNextPipelines() is synchronized
and checks pipeline status, but the pipeline status change and the callback
execution happen asynchronously. This creates a potential race condition where
multiple callbacks could be triggered simultaneously, especially if pipelines
complete at nearly the same time. While the synchronized keyword on
startNextPipelines() provides some protection, the asynchronous nature of the
callback combined with the atomic operations on runningPipelineCount could lead
to edge cases where the count becomes inconsistent with the actual running
state.
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PipelineConcurrencyIT.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+/**
+ * E2E test for pipeline concurrency control feature. Tests serial, limited,
and unlimited
+ * concurrency modes.
+ */
+@Slf4j
+public class PipelineConcurrencyIT extends SeaTunnelEngineContainer {
+
+ @Test
+ public void testSerialPipelineExecution() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelJob("/pipeline_concurrency_serial.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ String logs = execResult.getStdout();
+
+ // Verify serial mode logging
+ Assertions.assertTrue(
+ logs.contains("Parsed pipeline_concurrency from config: 1"),
+ "Should parse pipeline_concurrency = 1");
+ Assertions.assertTrue(
+ logs.contains("Pipeline concurrency: 1"), "Should show
pipeline concurrency as 1");
+ Assertions.assertTrue(logs.contains("[Serial Mode]"), "Should execute
in serial mode");
+
+ log.info("✅ Serial pipeline execution test passed");
+ }
+
+ @Test
+ public void testLimitedPipelineConcurrency() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelJob("/pipeline_concurrency_limited.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ String logs = execResult.getStdout();
+
+ // Verify limited concurrency logging
+ Assertions.assertTrue(
+ logs.contains("Parsed pipeline_concurrency from config: 2"),
+ "Should parse pipeline_concurrency = 2");
+ Assertions.assertTrue(
+ logs.contains("Pipeline concurrency: 2"), "Should show
pipeline concurrency as 2");
+ Assertions.assertTrue(
+ logs.contains("[Limited Concurrency=2]"),
+ "Should execute in limited concurrency mode");
+
+ log.info("✅ Limited pipeline concurrency test passed");
+ }
Review Comment:
Similar to the serial test, this test only verifies log messages but doesn't
verify that the concurrency limit is actually enforced. The test should verify
that at most 2 pipelines execute concurrently by checking execution timestamps
or other behavioral evidence.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -163,6 +175,26 @@ public void addPipelineEndCallback(SubPlan subPlan) {
jobFullName));
updateJobState(JobStatus.FAILING);
}
+ } else if (PipelineStatus.FINISHED.equals(
+ pipelineState.getPipelineStatus())) {
+ // Pipeline finished successfully, decrease
running count and try to
+ // start next pipeline
+ int currentRunning =
runningPipelineCount.decrementAndGet();
+ int completed =
+ finishedPipelineNum
+ .incrementAndGet(); // Will be
incremented below
Review Comment:
The finishedPipelineNum counter is being incremented twice for finished
pipelines. It's incremented once at line 185 (inside the FINISHED status block)
and again at line 200 (for all pipeline completions). This causes the finished
pipeline count to be double-counted for successful pipelines, which will break
the job completion check at line 200 and potentially cause jobs to hang or
complete prematurely.
```suggestion
// Compute the expected completed count without
mutating the counter;
// the actual increment happens in the common
completion block below.
int completed = finishedPipelineNum.get() + 1;
```
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PipelineConcurrencyIT.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+/**
+ * E2E test for pipeline concurrency control feature. Tests serial, limited,
and unlimited
+ * concurrency modes.
+ */
+@Slf4j
+public class PipelineConcurrencyIT extends SeaTunnelEngineContainer {
+
+ @Test
+ public void testSerialPipelineExecution() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelJob("/pipeline_concurrency_serial.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ String logs = execResult.getStdout();
+
+ // Verify serial mode logging
+ Assertions.assertTrue(
+ logs.contains("Parsed pipeline_concurrency from config: 1"),
+ "Should parse pipeline_concurrency = 1");
+ Assertions.assertTrue(
+ logs.contains("Pipeline concurrency: 1"), "Should show
pipeline concurrency as 1");
+ Assertions.assertTrue(logs.contains("[Serial Mode]"), "Should execute
in serial mode");
+
+ log.info("✅ Serial pipeline execution test passed");
+ }
Review Comment:
The test only verifies that specific log messages appear in the output, but
doesn't actually verify the concurrency control behavior. For example,
testSerialPipelineExecution should verify that pipelines actually execute one
at a time by checking timestamps or ensuring overlapping execution times don't
occur. The current test would pass even if the concurrency control is broken,
as long as the logging is correct.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -374,4 +402,56 @@ public void setPreApplyResourceFutures(
Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures) {
this.preApplyResourceFutures = preApplyResourceFutures;
}
+
+ /** Start next pipelines according to concurrency limit */
+ private synchronized void startNextPipelines() {
+ while (runningPipelineCount.get() < pipelineConcurrency) {
+ int nextIndex = nextPipelineIndex.getAndIncrement();
+ if (nextIndex >= pipelineList.size()) {
+ // No more pipelines to start
+ break;
+ }
+
+ SubPlan nextPipeline = pipelineList.get(nextIndex);
+ if
(PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
+ runningPipelineCount.incrementAndGet();
+ int completed = finishedPipelineNum.get();
+ int total = pipelineList.size();
+ int running = runningPipelineCount.get();
+ int queued = total - completed - running;
+
+ if (pipelineConcurrency == 1) {
+ log.info(
+ "[Serial Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ } else if (pipelineConcurrency < Integer.MAX_VALUE) {
+ log.info(
+ "[Limited Concurrency={}] Starting pipeline {}/{}:
{} | Running: {} | Queued: {} | Completed: {}",
+ pipelineConcurrency,
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ } else {
+ log.info(
+ "[Concurrent Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ }
+
+ nextPipeline.startSubPlanStateProcess();
Review Comment:
If startSubPlanStateProcess() at line 453 throws an exception, the
runningPipelineCount has already been incremented at line 417, but the pipeline
may not actually be running. This will leave the count in an inconsistent state
and permanently reduce the available concurrency slots. The increment should
happen after successfully starting the pipeline, or the method should have
proper exception handling to decrement the count if startup fails.
```suggestion
try {
nextPipeline.startSubPlanStateProcess();
} catch (RuntimeException e) {
// Roll back running pipeline count if startup fails
runningPipelineCount.decrementAndGet();
log.error(
"Failed to start pipeline {}: {}",
nextPipeline.getPipelineFullName(),
e.getMessage(),
e);
throw e;
}
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -374,4 +402,56 @@ public void setPreApplyResourceFutures(
Map<TaskGroupLocation, CompletableFuture<SlotProfile>>
preApplyResourceFutures) {
this.preApplyResourceFutures = preApplyResourceFutures;
}
+
+ /** Start next pipelines according to concurrency limit */
+ private synchronized void startNextPipelines() {
+ while (runningPipelineCount.get() < pipelineConcurrency) {
+ int nextIndex = nextPipelineIndex.getAndIncrement();
+ if (nextIndex >= pipelineList.size()) {
+ // No more pipelines to start
+ break;
+ }
+
+ SubPlan nextPipeline = pipelineList.get(nextIndex);
+ if
(PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
+ runningPipelineCount.incrementAndGet();
+ int completed = finishedPipelineNum.get();
+ int total = pipelineList.size();
+ int running = runningPipelineCount.get();
+ int queued = total - completed - running;
+
+ if (pipelineConcurrency == 1) {
+ log.info(
+ "[Serial Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ } else if (pipelineConcurrency < Integer.MAX_VALUE) {
+ log.info(
+ "[Limited Concurrency={}] Starting pipeline {}/{}:
{} | Running: {} | Queued: {} | Completed: {}",
+ pipelineConcurrency,
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ } else {
+ log.info(
+ "[Concurrent Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
+ nextIndex + 1,
+ total,
+ nextPipeline.getPipelineFullName(),
+ running,
+ queued,
+ completed);
+ }
+
+ nextPipeline.startSubPlanStateProcess();
+ }
Review Comment:
If a pipeline is not in CREATED status when its turn comes (line 416), the
nextPipelineIndex is already incremented but runningPipelineCount is not
incremented. This means the slot for that pipeline is lost and fewer pipelines
will run than the configured concurrency limit. The method should either skip
non-CREATED pipelines and continue to the next one, or handle this case
explicitly to ensure the concurrency limit is properly maintained.
```suggestion
int nextIndex = nextPipelineIndex.get();
if (nextIndex >= pipelineList.size()) {
// No more pipelines to start
break;
}
SubPlan nextPipeline = pipelineList.get(nextIndex);
if
(!PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
// Skip pipelines that are not in CREATED status and try the
next one
nextPipelineIndex.incrementAndGet();
continue;
}
// Consume this index for a CREATED pipeline
nextPipelineIndex.incrementAndGet();
runningPipelineCount.incrementAndGet();
int completed = finishedPipelineNum.get();
int total = pipelineList.size();
int running = runningPipelineCount.get();
int queued = total - completed - running;
if (pipelineConcurrency == 1) {
log.info(
"[Serial Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
nextIndex + 1,
total,
nextPipeline.getPipelineFullName(),
running,
queued,
completed);
} else if (pipelineConcurrency < Integer.MAX_VALUE) {
log.info(
"[Limited Concurrency={}] Starting pipeline {}/{}:
{} | Running: {} | Queued: {} | Completed: {}",
pipelineConcurrency,
nextIndex + 1,
total,
nextPipeline.getPipelineFullName(),
running,
queued,
completed);
} else {
log.info(
"[Concurrent Mode] Starting pipeline {}/{}: {} |
Running: {} | Queued: {} | Completed: {}",
nextIndex + 1,
total,
nextPipeline.getPipelineFullName(),
running,
queued,
completed);
}
nextPipeline.startSubPlanStateProcess();
```
##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java:
##########
@@ -335,6 +335,40 @@ private void fillJobConfigAndCommonJars() {
jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
}
jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
+
+ // Parse pipeline_concurrency from env config
+ log.info("Checking for pipeline_concurrency configuration...");
+ log.info("Config has env section: {}",
seaTunnelJobConfig.hasPath("env"));
+ if (seaTunnelJobConfig.hasPath("env")) {
+ log.info("Env config keys: {}",
seaTunnelJobConfig.getConfig("env").entrySet());
Review Comment:
These debug log statements should be reduced or changed to debug level.
Lines 340-341, 343 are producing excessive logging that isn't necessary for
normal operation. Either remove these logs or change them from info level to
debug level to avoid cluttering production logs.
```suggestion
log.debug("Checking for pipeline_concurrency configuration...");
log.debug("Config has env section: {}",
seaTunnelJobConfig.hasPath("env"));
if (seaTunnelJobConfig.hasPath("env")) {
log.debug("Env config keys: {}",
seaTunnelJobConfig.getConfig("env").entrySet());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]