lihjChina opened a new pull request, #10364:
URL: https://github.com/apache/seatunnel/pull/10364

   Description
   This PR introduces a configurable pipeline concurrency control mechanism 
that allows users to control how many pipelines execute simultaneously within a 
job.
   
   What is the purpose of the change
   Currently, SeaTunnel executes all pipelines concurrently when a job starts. 
This can cause issues when:
   
   Multiple pipelines compete for limited database connections
   Sequential execution is required for data consistency
   Resource constraints require limiting concurrent operations
   This change adds a pipeline_concurrency configuration option to control the 
maximum number of pipelines that can run simultaneously.
   
   Motivation
   Problem:
   
   Users cannot control pipeline execution order
   All pipelines start simultaneously, causing resource contention
   No way to enforce serial execution when needed
   Use Case:
   
   A job with 4 pipelines reading from the same database
   Database has connection limits
   Need to execute pipelines serially to avoid connection exhaustion
   Implementation
   Changes Made
   1. JobConfig Enhancement
   File: 
   
seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
   
   Added pipelineConcurrency field (default: Integer.MAX_VALUE for unlimited)
   Updated serialization methods (
   writeData
   , 
   readData
   )
   2. Configuration Parsing
   File: 
   
seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
   
   Added logic to parse pipeline_concurrency from env configuration
   Validates and sets the value in 
   JobConfig
   3. Execution Control
   File: 
   
seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
   
   Added concurrency control fields:
   pipelineConcurrency: configured limit
   runningPipelineCount: current running count
   nextPipelineIndex: next pipeline to start
   Implemented 
   startNextPipelines()
    method for controlled startup
   Modified 
   stateProcess()
    to use controlled startup
   Enhanced 
   addPipelineEndCallback()
    to start next pipeline when one completes
   Added detailed logging for monitoring execution
   Configuration Format
   env {
     job.mode = "BATCH"
     job.name = "example_job"
     pipeline_concurrency = 1  # 1 for serial, N for limited concurrency, omit 
for unlimited
   }
   Testing
   Test Environment
   SeaTunnel version: [your version]
   Test scenario: 4 pipelines reading from MySQL
   Test Cases
   Serial Execution (pipeline_concurrency = 1)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to