joexjx commented on code in PR #9319:
URL: https://github.com/apache/seatunnel/pull/9319#discussion_r2092598227


##########
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java:
##########
@@ -92,12 +92,12 @@ public List<DatasetTableInfo> 
execute(List<DatasetTableInfo> upstreamDataStreams
                                         EnvCommonOptions.PARALLELISM.key(),
                                         
EnvCommonOptions.PARALLELISM.defaultValue());
             }
+            envOption.put(EnvCommonOptions.PARALLELISM.key(), 
String.valueOf(parallelism));
             Dataset<Row> dataset =
                     sparkRuntimeEnvironment
                             .getSparkSession()
                             .read()
                             .format(SeaTunnelSource.class.getSimpleName())
-                            .option(EnvCommonOptions.PARALLELISM.key(), 
parallelism)
                             .option(

Review Comment:
   
![image](https://github.com/user-attachments/assets/2f460aa2-a3cb-4d17-9176-c240c9c6698a)
   
   I understand. However, this code does contain a very simple and obvious 
error. envOption is a HashMap that already includes 
EnvCommonOptions.PARALLELISM.key(). The 
.option(EnvCommonOptions.PARALLELISM.key(), parallelism) call is being 
overwritten by .options(envOption), which is why the properly configured 
parallelism (determined earlier based on whether it comes from env or source) 
isn't taking effect.



##########
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java:
##########
@@ -92,12 +92,12 @@ public List<DatasetTableInfo> 
execute(List<DatasetTableInfo> upstreamDataStreams
                                         EnvCommonOptions.PARALLELISM.key(),
                                         
EnvCommonOptions.PARALLELISM.defaultValue());
             }
+            envOption.put(EnvCommonOptions.PARALLELISM.key(), 
String.valueOf(parallelism));
             Dataset<Row> dataset =
                     sparkRuntimeEnvironment
                             .getSparkSession()
                             .read()
                             .format(SeaTunnelSource.class.getSimpleName())
-                            .option(EnvCommonOptions.PARALLELISM.key(), 
parallelism)
                             .option(

Review Comment:
   I simply removed .option(EnvCommonOptions.PARALLELISM.key(), ...) and 
updated envOptions with EnvCommonOptions.PARALLELISM in advance. This ensures 
that the parallelism value (which was already determined earlier based on 
whether it should be taken from env or source) takes effect correctly.



##########
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java:
##########
@@ -92,12 +92,12 @@ public List<DatasetTableInfo> 
execute(List<DatasetTableInfo> upstreamDataStreams
                                         EnvCommonOptions.PARALLELISM.key(),
                                         
EnvCommonOptions.PARALLELISM.defaultValue());
             }
+            envOption.put(EnvCommonOptions.PARALLELISM.key(), 
String.valueOf(parallelism));
             Dataset<Row> dataset =
                     sparkRuntimeEnvironment
                             .getSparkSession()
                             .read()
                             .format(SeaTunnelSource.class.getSimpleName())
-                            .option(EnvCommonOptions.PARALLELISM.key(), 
parallelism)
                             .option(

Review Comment:
   
![image](https://github.com/user-attachments/assets/2f460aa2-a3cb-4d17-9176-c240c9c6698a)
   
![image](https://github.com/user-attachments/assets/88016382-ba60-484a-8804-2d1ef5846951)
   
   I understand. However, this code does contain a very simple and obvious 
error. envOption is a HashMap that already includes 
EnvCommonOptions.PARALLELISM.key(). The 
.option(EnvCommonOptions.PARALLELISM.key(), parallelism) call is being 
overwritten by .options(envOption), which is why the properly configured 
parallelism (determined earlier based on whether it comes from env or source) 
isn't taking effect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to