khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000301539
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -110,12 +110,13 @@ protected List<WorkUnit>
loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste
* NOTE: adapted from {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
* @return count of how many tasks executed (0 if execution ultimately
failed, but we *believe* TaskState should already have been recorded beforehand)
*/
- protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws
IOException, InterruptedException {
+ protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+ Properties jobProperties) throws IOException,
InterruptedException {
String containerId = "container-id-for-wu-" + wu.getCorrelator();
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
TaskStateTracker taskStateTracker =
createEssentializedTaskStateTracker(wu);
- TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ TaskExecutor taskExecutor = new TaskExecutor(jobProperties);
Review Comment:
Task executor determines parallelism using (`taskexecutor.threadpool.size`),
without this change it is always using default value instead of parallelism
being controlled by a config.
```
public TaskExecutor(Properties properties) {
this(Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY,
Integer.toString(ConfigurationKeys.DEFAULT_TASK_EXECUTOR_THREADPOOL_SIZE))),
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]