lihjChina opened a new pull request, #10364:
URL: https://github.com/apache/seatunnel/pull/10364
Description
This PR introduces a configurable pipeline concurrency control mechanism
that allows users to control how many pipelines execute simultaneously within a
job.
What is the purpose of the change
Currently, SeaTunnel executes all pipelines concurrently when a job starts.
This can cause issues when:
Multiple pipelines compete for limited database connections
Sequential execution is required for data consistency
Resource constraints require limiting concurrent operations
This change adds a pipeline_concurrency configuration option to control the
maximum number of pipelines that can run simultaneously.
Motivation
Problem:
Users cannot control pipeline execution order
All pipelines start simultaneously, causing resource contention
No way to enforce serial execution when needed
Use Case:
A job with 4 pipelines reading from the same database
Database has connection limits
Need to execute pipelines serially to avoid connection exhaustion
Implementation
Changes Made
1. JobConfig Enhancement
File:
seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/JobConfig.java
Added pipelineConcurrency field (default: Integer.MAX_VALUE for unlimited)
Updated serialization methods (
writeData
,
readData
)
2. Configuration Parsing
File:
seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
Added logic to parse pipeline_concurrency from env configuration
Validates and sets the value in
JobConfig
3. Execution Control
File:
seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java
Added concurrency control fields:
pipelineConcurrency: configured limit
runningPipelineCount: current running count
nextPipelineIndex: next pipeline to start
Implemented
startNextPipelines()
method for controlled startup
Modified
stateProcess()
to use controlled startup
Enhanced
addPipelineEndCallback()
to start next pipeline when one completes
Added detailed logging for monitoring execution
Configuration Format
env {
job.mode = "BATCH"
job.name = "example_job"
pipeline_concurrency = 1 # 1 for serial, N for limited concurrency, omit
for unlimited
}
Testing
Test Environment
SeaTunnel version: [your version]
Test scenario: 4 pipelines reading from MySQL
Test Cases
Serial Execution (pipeline_concurrency = 1)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]