[
https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962219
]
ASF GitHub Bot logged work on GOBBLIN-2199:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Mar/25 06:29
Start Date: 18/Mar/25 06:29
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000301539
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -110,12 +110,13 @@ protected List<WorkUnit>
loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste
* NOTE: adapted from {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
* @return count of how many tasks executed (0 if execution ultimately
failed, but we *believe* TaskState should already have been recorded beforehand)
*/
- protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws
IOException, InterruptedException {
+ protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+ Properties jobProperties) throws IOException,
InterruptedException {
String containerId = "container-id-for-wu-" + wu.getCorrelator();
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
TaskStateTracker taskStateTracker =
createEssentializedTaskStateTracker(wu);
- TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ TaskExecutor taskExecutor = new TaskExecutor(jobProperties);
Review Comment:
Task executor determines parallelism using (`taskexecutor.threadpool.size`),
without this change it is always using default value instead of parallelism
being controlled by a config.
```
public TaskExecutor(Properties properties) {
this(Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY,
Integer.toString(ConfigurationKeys.DEFAULT_TASK_EXECUTOR_THREADPOOL_SIZE))),
...
}
```
Issue Time Tracking
-------------------
Worklog Id: (was: 962219)
Time Spent: 20m (was: 10m)
> Support dynamic container scaling on Temporal workload
> ------------------------------------------------------
>
> Key: GOBBLIN-2199
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2199
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Prateek Khandelwal
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Currently, Gobblin runs static count of container(initial containers at the
> start of job). We need to support dynamic scaling by computing the
> recommended number of containers such that large data copy workloads can be
> processed within some completion time and without running into OOM errors on
> containers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)