[
https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962216&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962216
]
ASF GitHub Bot logged work on GOBBLIN-2199:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Mar/25 06:12
Start Date: 18/Mar/25 06:12
Worklog Time Spent: 10m
Work Description: Blazer-007 commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000257208
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
/**
* Simple config-driven linear recommendation for how many containers to use
to complete the "remaining work" within a given {@link TimeBudget}, per:
*
- * a. from {@link WorkUnitsSizeSummary}, find how many (remaining)
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some
mean size
- * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit`
(MWU)
- * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- * d. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish
within the target number of minutes
+ * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link
org.apache.gobblin.source.workunit.WorkUnit}s
+ * b. from the configured
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ * c. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends
AbstractRecommendScalingForWorkUnitsImpl {
- public static final String AMORTIZED_NUM_BYTES_PER_MINUTE =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX +
"heuristic.params.numBytesPerMinute";
- public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L
* 1000L * 60L; // 80MB/sec
-
+ /**
+ * Calculates the recommended number of containers for processing the
remaining work units.
+ * <p>
+ * This method first checks whether dynamic scaling is enabled via the job
state configuration.
+ * If dynamic scaling is disabled, it returns the initial container count as
specified in the job state.
+ * When dynamic scaling is enabled, it computes the throughput based on the
count of constituent work units (WUs)
+ * and the processing rate (bytes per minute per thread). The calculation
involves:
+ * <ol>
+ * <li>Computing the total bytes to be processed based on the count and
mean size of top-level work units.</li>
+ * <li>Calculating the processing rate per container using the amortized
bytes per minute rate and the container's work unit capacity.</li>
+ * <li>Estimating the total container-minutes required to process all MWUs
and determining the number of containers needed
+ * to meet the job time budget.</li>
+ * <li>Computing an alternative container count based on the maximum
number of work units allowed per container.</li>
+ * <li>Returning the maximum of the two computed container counts as the
recommended scaling.</li>
+ * </ol>
+ * </p>
+ *
+ * @param remainingWork the summary of work unit sizes and counts remaining
for processing
+ * @param sourceClass the name of the class invoking this method
+ * @param jobTimeBudget the time budget allocated for the job execution
+ * @param jobState the current job state that holds configuration properties
and runtime parameters
+ * @return the recommended number of containers to allocate for processing
the work units
+ */
@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
- // for simplicity, for now, consider only top-level work units (aka.
`MultiWorkUnit`s - MWUs)
+ if
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)) {
+ int initialContainerCount =
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1"));
+ log.info("Dynamic scaling is disabled, returning initial container
count: " + initialContainerCount);
+ return initialContainerCount;
+ }
+
+ long numWUs = remainingWork.getConstituentWorkUnitsCount();
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
- int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
- long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+ double totalBytes = numMWUs * meanBytesPerMWU;
Review Comment:
we can use `remainingWork.getTotalSize`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -110,12 +110,13 @@ protected List<WorkUnit>
loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste
* NOTE: adapted from {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
* @return count of how many tasks executed (0 if execution ultimately
failed, but we *believe* TaskState should already have been recorded beforehand)
*/
- protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws
IOException, InterruptedException {
+ protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+ Properties jobProperties) throws IOException,
InterruptedException {
String containerId = "container-id-for-wu-" + wu.getCorrelator();
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
TaskStateTracker taskStateTracker =
createEssentializedTaskStateTracker(wu);
- TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ TaskExecutor taskExecutor = new TaskExecutor(jobProperties);
Review Comment:
Are we passing any property through `jobProperties` that is needed to create
`taskExecutor` ?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
/**
* Simple config-driven linear recommendation for how many containers to use
to complete the "remaining work" within a given {@link TimeBudget}, per:
*
- * a. from {@link WorkUnitsSizeSummary}, find how many (remaining)
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some
mean size
- * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit`
(MWU)
- * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- * d. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish
within the target number of minutes
+ * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link
org.apache.gobblin.source.workunit.WorkUnit}s
+ * b. from the configured
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ * c. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends
AbstractRecommendScalingForWorkUnitsImpl {
- public static final String AMORTIZED_NUM_BYTES_PER_MINUTE =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX +
"heuristic.params.numBytesPerMinute";
- public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L
* 1000L * 60L; // 80MB/sec
-
+ /**
+ * Calculates the recommended number of containers for processing the
remaining work units.
+ * <p>
+ * This method first checks whether dynamic scaling is enabled via the job
state configuration.
+ * If dynamic scaling is disabled, it returns the initial container count as
specified in the job state.
+ * When dynamic scaling is enabled, it computes the throughput based on the
count of constituent work units (WUs)
+ * and the processing rate (bytes per minute per thread). The calculation
involves:
+ * <ol>
+ * <li>Computing the total bytes to be processed based on the count and
mean size of top-level work units.</li>
+ * <li>Calculating the processing rate per container using the amortized
bytes per minute rate and the container's work unit capacity.</li>
+ * <li>Estimating the total container-minutes required to process all MWUs
and determining the number of containers needed
+ * to meet the job time budget.</li>
+ * <li>Computing an alternative container count based on the maximum
number of work units allowed per container.</li>
+ * <li>Returning the maximum of the two computed container counts as the
recommended scaling.</li>
+ * </ol>
+ * </p>
+ *
+ * @param remainingWork the summary of work unit sizes and counts remaining
for processing
+ * @param sourceClass the name of the class invoking this method
+ * @param jobTimeBudget the time budget allocated for the job execution
+ * @param jobState the current job state that holds configuration properties
and runtime parameters
+ * @return the recommended number of containers to allocate for processing
the work units
+ */
@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
- // for simplicity, for now, consider only top-level work units (aka.
`MultiWorkUnit`s - MWUs)
+ if
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)) {
+ int initialContainerCount =
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1"));
+ log.info("Dynamic scaling is disabled, returning initial container
count: " + initialContainerCount);
+ return initialContainerCount;
+ }
+
+ long numWUs = remainingWork.getConstituentWorkUnitsCount();
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
- int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
- long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+ double totalBytes = numMWUs * meanBytesPerMWU;
+ long bytesPerMinuteProcRatePerThread =
calcAmortizedBytesPerMinute(jobState);
log.info("Calculating auto-scaling (for {} remaining work units within {})
using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
- numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
+ numMWUs, jobTimeBudget, bytesPerMinuteProcRatePerThread,
meanBytesPerMWU);
Review Comment:
log line `bytesPerMinuteProcRate` --> `bytesPerMinuteProcRatePerThread`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -166,17 +166,19 @@ protected ProcessWorkUnitsWorkflow
createProcessWorkUnitsWorkflow(Properties job
protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime,
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
// TODO: make fully configurable! for now, cap Work Discovery at 45 mins
and set aside 10 mins for the `CommitStepWorkflow`
long maxGenWUsMins = 45;
- long commitStepMins = 10;
+ long commitStepMins = 15;
long totalTargetTimeMins =
TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
- long remainingMins = totalTargetTimeMins -
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
+
Review Comment:
remove or comment out this line as well ?
Issue Time Tracking
-------------------
Worklog Id: (was: 962216)
Remaining Estimate: 0h
Time Spent: 10m
> Support dynamic container scaling on Temporal workload
> ------------------------------------------------------
>
> Key: GOBBLIN-2199
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2199
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Prateek Khandelwal
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Currently, Gobblin runs static count of container(initial containers at the
> start of job). We need to support dynamic scaling by computing the
> recommended number of containers such that large data copy workloads can be
> processed within some completion time and without running into OOM errors on
> containers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)