joexjx commented on code in PR #9319:
URL: https://github.com/apache/seatunnel/pull/9319#discussion_r2092598227
##########
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java:
##########
@@ -92,12 +92,12 @@ public List<DatasetTableInfo>
execute(List<DatasetTableInfo> upstreamDataStreams
EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
}
+ envOption.put(EnvCommonOptions.PARALLELISM.key(),
String.valueOf(parallelism));
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(EnvCommonOptions.PARALLELISM.key(),
parallelism)
.option(
Review Comment:

I understand. However, this code does contain a very simple and obvious
error. envOption is a HashMap that already includes
EnvCommonOptions.PARALLELISM.key(). The
.option(EnvCommonOptions.PARALLELISM.key(), parallelism) call is being
overwritten by .options(envOption), which is why the properly configured
parallelism (determined earlier based on whether it comes from env or source)
isn't taking effect.
##########
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java:
##########
@@ -92,12 +92,12 @@ public List<DatasetTableInfo>
execute(List<DatasetTableInfo> upstreamDataStreams
EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
}
+ envOption.put(EnvCommonOptions.PARALLELISM.key(),
String.valueOf(parallelism));
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(EnvCommonOptions.PARALLELISM.key(),
parallelism)
.option(
Review Comment:
I simply removed .option(EnvCommonOptions.PARALLELISM.key(), ...) and
updated envOptions with EnvCommonOptions.PARALLELISM in advance. This ensures
that the parallelism value (which was already determined earlier based on
whether it should be taken from env or source) takes effect correctly.
##########
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java:
##########
@@ -92,12 +92,12 @@ public List<DatasetTableInfo>
execute(List<DatasetTableInfo> upstreamDataStreams
EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
}
+ envOption.put(EnvCommonOptions.PARALLELISM.key(),
String.valueOf(parallelism));
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(EnvCommonOptions.PARALLELISM.key(),
parallelism)
.option(
Review Comment:


I understand. However, this code does contain a very simple and obvious
error. envOption is a HashMap that already includes
EnvCommonOptions.PARALLELISM.key(). The
.option(EnvCommonOptions.PARALLELISM.key(), parallelism) call is being
overwritten by .options(envOption), which is why the properly configured
parallelism (determined earlier based on whether it comes from env or source)
isn't taking effect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]