Copilot commented on code in PR #10364:
URL: https://github.com/apache/seatunnel/pull/10364#discussion_r2707307124


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -374,4 +402,56 @@ public void setPreApplyResourceFutures(
             Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures) {
         this.preApplyResourceFutures = preApplyResourceFutures;
     }
+
+    /** Start next pipelines according to concurrency limit */
+    private synchronized void startNextPipelines() {
+        while (runningPipelineCount.get() < pipelineConcurrency) {
+            int nextIndex = nextPipelineIndex.getAndIncrement();
+            if (nextIndex >= pipelineList.size()) {
+                // No more pipelines to start
+                break;
+            }
+
+            SubPlan nextPipeline = pipelineList.get(nextIndex);
+            if 
(PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
+                runningPipelineCount.incrementAndGet();
+                int completed = finishedPipelineNum.get();
+                int total = pipelineList.size();
+                int running = runningPipelineCount.get();
+                int queued = total - completed - running;
+
+                if (pipelineConcurrency == 1) {
+                    log.info(
+                            "[Serial Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                } else if (pipelineConcurrency < Integer.MAX_VALUE) {
+                    log.info(
+                            "[Limited Concurrency={}] Starting pipeline {}/{}: 
{} | Running: {} | Queued: {} | Completed: {}",
+                            pipelineConcurrency,
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                } else {
+                    log.info(
+                            "[Concurrent Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);

Review Comment:
   In unlimited concurrency mode (pipelineConcurrency = Integer.MAX_VALUE), the 
while loop condition at line 408 will always be true until all pipelines are 
started. Combined with the synchronized method, this means startNextPipelines() 
will start ALL pipelines in a single call during the SCHEDULED state transition 
(line 362). While this matches the original behavior, the logging messages at 
lines 443-450 claiming "[Concurrent Mode]" will be misleading since all 
pipelines start at once rather than progressively as slots become available. 
The "Running", "Queued", and "Completed" counts in the log will all be zero or 
incorrect at startup.
   ```suggestion
                       // In unlimited concurrency mode, all pipelines are 
started in a single pass.
                       // Avoid logging running/queued/completed counts here to 
prevent misleading output.
                       log.info(
                               "[Unlimited Concurrency Mode] Starting pipeline 
{}/{}: {}",
                               nextIndex + 1,
                               total,
                               nextPipeline.getPipelineFullName());
   ```



##########
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java:
##########
@@ -51,13 +52,15 @@ public int getClassId() {
     public void writeData(ObjectDataOutput out) throws IOException {
         out.writeString(name);
         out.writeObject(jobContext);
+        out.writeInt(pipelineConcurrency);
         out.writeObject(envOptions);
     }
 
     @Override
     public void readData(ObjectDataInput in) throws IOException {
         this.name = in.readString();
         this.jobContext = in.readObject();
+        this.pipelineConcurrency = in.readInt();

Review Comment:
   The serialization format change in JobConfig introduces a backward 
compatibility issue. Adding a new field (pipelineConcurrency) between existing 
fields in the serialization order means that older clients/servers won't be 
able to deserialize JobConfig objects from newer versions, and vice versa. This 
breaks rolling upgrades and cluster compatibility. Consider: 1) Adding version 
handling in the serialization logic, 2) Adding the field at the end after 
envOptions, or 3) Using a versioned serialization approach with conditional 
field reading/writing.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -163,6 +175,26 @@ public void addPipelineEndCallback(SubPlan subPlan) {
                                                 jobFullName));
                                 updateJobState(JobStatus.FAILING);
                             }
+                        } else if (PipelineStatus.FINISHED.equals(
+                                pipelineState.getPipelineStatus())) {
+                            // Pipeline finished successfully, decrease 
running count and try to
+                            // start next pipeline
+                            int currentRunning = 
runningPipelineCount.decrementAndGet();
+                            int completed =
+                                    finishedPipelineNum
+                                            .incrementAndGet(); // Will be 
incremented below
+                            int total = pipelineList.size();
+                            int queued = total - completed - currentRunning;
+
+                            log.info(
+                                    "Pipeline completed: {} | Progress: {}/{} 
| Running: {} | Queued: {}",
+                                    subPlan.getPipelineFullName(),
+                                    completed,
+                                    total,
+                                    currentRunning,
+                                    queued);
+
+                            startNextPipelines();

Review Comment:
   The addPipelineEndCallback method registers a callback that runs 
asynchronously via thenAcceptAsync. When a pipeline finishes, it calls 
startNextPipelines() at line 197. However, startNextPipelines() is synchronized 
and checks pipeline status, but the pipeline status change and the callback 
execution happen asynchronously. This creates a potential race condition where 
multiple callbacks could be triggered simultaneously, especially if pipelines 
complete at nearly the same time. While the synchronized keyword on 
startNextPipelines() provides some protection, the asynchronous nature of the 
callback combined with the atomic operations on runningPipelineCount could lead 
to edge cases where the count becomes inconsistent with the actual running 
state.



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PipelineConcurrencyIT.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+/**
+ * E2E test for pipeline concurrency control feature. Tests serial, limited, 
and unlimited
+ * concurrency modes.
+ */
+@Slf4j
+public class PipelineConcurrencyIT extends SeaTunnelEngineContainer {
+
+    @Test
+    public void testSerialPipelineExecution() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelJob("/pipeline_concurrency_serial.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        String logs = execResult.getStdout();
+
+        // Verify serial mode logging
+        Assertions.assertTrue(
+                logs.contains("Parsed pipeline_concurrency from config: 1"),
+                "Should parse pipeline_concurrency = 1");
+        Assertions.assertTrue(
+                logs.contains("Pipeline concurrency: 1"), "Should show 
pipeline concurrency as 1");
+        Assertions.assertTrue(logs.contains("[Serial Mode]"), "Should execute 
in serial mode");
+
+        log.info("✅ Serial pipeline execution test passed");
+    }
+
+    @Test
+    public void testLimitedPipelineConcurrency() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelJob("/pipeline_concurrency_limited.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        String logs = execResult.getStdout();
+
+        // Verify limited concurrency logging
+        Assertions.assertTrue(
+                logs.contains("Parsed pipeline_concurrency from config: 2"),
+                "Should parse pipeline_concurrency = 2");
+        Assertions.assertTrue(
+                logs.contains("Pipeline concurrency: 2"), "Should show 
pipeline concurrency as 2");
+        Assertions.assertTrue(
+                logs.contains("[Limited Concurrency=2]"),
+                "Should execute in limited concurrency mode");
+
+        log.info("✅ Limited pipeline concurrency test passed");
+    }

Review Comment:
   Similar to the serial test, this test only verifies log messages but doesn't 
verify that the concurrency limit is actually enforced. The test should verify 
that at most 2 pipelines execute concurrently by checking execution timestamps 
or other behavioral evidence.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -163,6 +175,26 @@ public void addPipelineEndCallback(SubPlan subPlan) {
                                                 jobFullName));
                                 updateJobState(JobStatus.FAILING);
                             }
+                        } else if (PipelineStatus.FINISHED.equals(
+                                pipelineState.getPipelineStatus())) {
+                            // Pipeline finished successfully, decrease 
running count and try to
+                            // start next pipeline
+                            int currentRunning = 
runningPipelineCount.decrementAndGet();
+                            int completed =
+                                    finishedPipelineNum
+                                            .incrementAndGet(); // Will be 
incremented below

Review Comment:
   The finishedPipelineNum counter is being incremented twice for finished 
pipelines. It's incremented once at line 185 (inside the FINISHED status block) 
and again at line 200 (for all pipeline completions). This causes the finished 
pipeline count to be double-counted for successful pipelines, which will break 
the job completion check at line 200 and potentially cause jobs to hang or 
complete prematurely.
   ```suggestion
                               // Compute the expected completed count without 
mutating the counter;
                               // the actual increment happens in the common 
completion block below.
                               int completed = finishedPipelineNum.get() + 1;
   ```



##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/PipelineConcurrencyIT.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+/**
+ * E2E test for pipeline concurrency control feature. Tests serial, limited, 
and unlimited
+ * concurrency modes.
+ */
+@Slf4j
+public class PipelineConcurrencyIT extends SeaTunnelEngineContainer {
+
+    @Test
+    public void testSerialPipelineExecution() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelJob("/pipeline_concurrency_serial.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        String logs = execResult.getStdout();
+
+        // Verify serial mode logging
+        Assertions.assertTrue(
+                logs.contains("Parsed pipeline_concurrency from config: 1"),
+                "Should parse pipeline_concurrency = 1");
+        Assertions.assertTrue(
+                logs.contains("Pipeline concurrency: 1"), "Should show 
pipeline concurrency as 1");
+        Assertions.assertTrue(logs.contains("[Serial Mode]"), "Should execute 
in serial mode");
+
+        log.info("✅ Serial pipeline execution test passed");
+    }

Review Comment:
   The test only verifies that specific log messages appear in the output, but 
doesn't actually verify the concurrency control behavior. For example, 
testSerialPipelineExecution should verify that pipelines actually execute one 
at a time by checking timestamps or ensuring overlapping execution times don't 
occur. The current test would pass even if the concurrency control is broken, 
as long as the logging is correct.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -374,4 +402,56 @@ public void setPreApplyResourceFutures(
             Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures) {
         this.preApplyResourceFutures = preApplyResourceFutures;
     }
+
+    /** Start next pipelines according to concurrency limit */
+    private synchronized void startNextPipelines() {
+        while (runningPipelineCount.get() < pipelineConcurrency) {
+            int nextIndex = nextPipelineIndex.getAndIncrement();
+            if (nextIndex >= pipelineList.size()) {
+                // No more pipelines to start
+                break;
+            }
+
+            SubPlan nextPipeline = pipelineList.get(nextIndex);
+            if 
(PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
+                runningPipelineCount.incrementAndGet();
+                int completed = finishedPipelineNum.get();
+                int total = pipelineList.size();
+                int running = runningPipelineCount.get();
+                int queued = total - completed - running;
+
+                if (pipelineConcurrency == 1) {
+                    log.info(
+                            "[Serial Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                } else if (pipelineConcurrency < Integer.MAX_VALUE) {
+                    log.info(
+                            "[Limited Concurrency={}] Starting pipeline {}/{}: 
{} | Running: {} | Queued: {} | Completed: {}",
+                            pipelineConcurrency,
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                } else {
+                    log.info(
+                            "[Concurrent Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                }
+
+                nextPipeline.startSubPlanStateProcess();

Review Comment:
   If startSubPlanStateProcess() at line 453 throws an exception, the 
runningPipelineCount has already been incremented at line 417, but the pipeline 
may not actually be running. This will leave the count in an inconsistent state 
and permanently reduce the available concurrency slots. The increment should 
happen after successfully starting the pipeline, or the method should have 
proper exception handling to decrement the count if startup fails.
   ```suggestion
                   try {
                       nextPipeline.startSubPlanStateProcess();
                   } catch (RuntimeException e) {
                       // Roll back running pipeline count if startup fails
                       runningPipelineCount.decrementAndGet();
                       log.error(
                               "Failed to start pipeline {}: {}",
                               nextPipeline.getPipelineFullName(),
                               e.getMessage(),
                               e);
                       throw e;
                   }
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -374,4 +402,56 @@ public void setPreApplyResourceFutures(
             Map<TaskGroupLocation, CompletableFuture<SlotProfile>> 
preApplyResourceFutures) {
         this.preApplyResourceFutures = preApplyResourceFutures;
     }
+
+    /** Start next pipelines according to concurrency limit */
+    private synchronized void startNextPipelines() {
+        while (runningPipelineCount.get() < pipelineConcurrency) {
+            int nextIndex = nextPipelineIndex.getAndIncrement();
+            if (nextIndex >= pipelineList.size()) {
+                // No more pipelines to start
+                break;
+            }
+
+            SubPlan nextPipeline = pipelineList.get(nextIndex);
+            if 
(PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
+                runningPipelineCount.incrementAndGet();
+                int completed = finishedPipelineNum.get();
+                int total = pipelineList.size();
+                int running = runningPipelineCount.get();
+                int queued = total - completed - running;
+
+                if (pipelineConcurrency == 1) {
+                    log.info(
+                            "[Serial Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                } else if (pipelineConcurrency < Integer.MAX_VALUE) {
+                    log.info(
+                            "[Limited Concurrency={}] Starting pipeline {}/{}: 
{} | Running: {} | Queued: {} | Completed: {}",
+                            pipelineConcurrency,
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                } else {
+                    log.info(
+                            "[Concurrent Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
+                            nextIndex + 1,
+                            total,
+                            nextPipeline.getPipelineFullName(),
+                            running,
+                            queued,
+                            completed);
+                }
+
+                nextPipeline.startSubPlanStateProcess();
+            }

Review Comment:
   If a pipeline is not in CREATED status when its turn comes (line 416), the 
nextPipelineIndex is already incremented but runningPipelineCount is not 
incremented. This means the slot for that pipeline is lost and fewer pipelines 
will run than the configured concurrency limit. The method should either skip 
non-CREATED pipelines and continue to the next one, or handle this case 
explicitly to ensure the concurrency limit is properly maintained.
   ```suggestion
               int nextIndex = nextPipelineIndex.get();
               if (nextIndex >= pipelineList.size()) {
                   // No more pipelines to start
                   break;
               }
   
               SubPlan nextPipeline = pipelineList.get(nextIndex);
               if 
(!PipelineStatus.CREATED.equals(nextPipeline.getCurrPipelineStatus())) {
                   // Skip pipelines that are not in CREATED status and try the 
next one
                   nextPipelineIndex.incrementAndGet();
                   continue;
               }
   
               // Consume this index for a CREATED pipeline
               nextPipelineIndex.incrementAndGet();
               runningPipelineCount.incrementAndGet();
               int completed = finishedPipelineNum.get();
               int total = pipelineList.size();
               int running = runningPipelineCount.get();
               int queued = total - completed - running;
   
               if (pipelineConcurrency == 1) {
                   log.info(
                           "[Serial Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
                           nextIndex + 1,
                           total,
                           nextPipeline.getPipelineFullName(),
                           running,
                           queued,
                           completed);
               } else if (pipelineConcurrency < Integer.MAX_VALUE) {
                   log.info(
                           "[Limited Concurrency={}] Starting pipeline {}/{}: 
{} | Running: {} | Queued: {} | Completed: {}",
                           pipelineConcurrency,
                           nextIndex + 1,
                           total,
                           nextPipeline.getPipelineFullName(),
                           running,
                           queued,
                           completed);
               } else {
                   log.info(
                           "[Concurrent Mode] Starting pipeline {}/{}: {} | 
Running: {} | Queued: {} | Completed: {}",
                           nextIndex + 1,
                           total,
                           nextPipeline.getPipelineFullName(),
                           running,
                           queued,
                           completed);
               }
   
               nextPipeline.startSubPlanStateProcess();
   ```



##########
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java:
##########
@@ -335,6 +335,40 @@ private void fillJobConfigAndCommonJars() {
             jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
         }
         jobConfig.getEnvOptions().putAll(envOptions.getSourceMap());
+
+        // Parse pipeline_concurrency from env config
+        log.info("Checking for pipeline_concurrency configuration...");
+        log.info("Config has env section: {}", 
seaTunnelJobConfig.hasPath("env"));
+        if (seaTunnelJobConfig.hasPath("env")) {
+            log.info("Env config keys: {}", 
seaTunnelJobConfig.getConfig("env").entrySet());

Review Comment:
   These debug log statements should be reduced or changed to debug level. 
Lines 340-341, 343 are producing excessive logging that isn't necessary for 
normal operation. Either remove these logs or change them from info level to 
debug level to avoid cluttering production logs.
   ```suggestion
           log.debug("Checking for pipeline_concurrency configuration...");
           log.debug("Config has env section: {}", 
seaTunnelJobConfig.hasPath("env"));
           if (seaTunnelJobConfig.hasPath("env")) {
               log.debug("Env config keys: {}", 
seaTunnelJobConfig.getConfig("env").entrySet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to