[
https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962221
]
ASF GitHub Bot logged work on GOBBLIN-2199:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Mar/25 06:34
Start Date: 18/Mar/25 06:34
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000306389
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
/**
* Simple config-driven linear recommendation for how many containers to use
to complete the "remaining work" within a given {@link TimeBudget}, per:
*
- * a. from {@link WorkUnitsSizeSummary}, find how many (remaining)
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some
mean size
- * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit`
(MWU)
- * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- * d. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish
within the target number of minutes
+ * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link
org.apache.gobblin.source.workunit.WorkUnit}s
+ * b. from the configured
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ * c. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends
AbstractRecommendScalingForWorkUnitsImpl {
- public static final String AMORTIZED_NUM_BYTES_PER_MINUTE =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX +
"heuristic.params.numBytesPerMinute";
- public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L
* 1000L * 60L; // 80MB/sec
-
+ /**
+ * Calculates the recommended number of containers for processing the
remaining work units.
+ * <p>
+ * This method first checks whether dynamic scaling is enabled via the job
state configuration.
+ * If dynamic scaling is disabled, it returns the initial container count as
specified in the job state.
+ * When dynamic scaling is enabled, it computes the throughput based on the
count of constituent work units (WUs)
+ * and the processing rate (bytes per minute per thread). The calculation
involves:
+ * <ol>
+ * <li>Computing the total bytes to be processed based on the count and
mean size of top-level work units.</li>
+ * <li>Calculating the processing rate per container using the amortized
bytes per minute rate and the container's work unit capacity.</li>
+ * <li>Estimating the total container-minutes required to process all MWUs
and determining the number of containers needed
+ * to meet the job time budget.</li>
+ * <li>Computing an alternative container count based on the maximum
number of work units allowed per container.</li>
+ * <li>Returning the maximum of the two computed container counts as the
recommended scaling.</li>
+ * </ol>
+ * </p>
+ *
+ * @param remainingWork the summary of work unit sizes and counts remaining
for processing
+ * @param sourceClass the name of the class invoking this method
+ * @param jobTimeBudget the time budget allocated for the job execution
+ * @param jobState the current job state that holds configuration properties
and runtime parameters
+ * @return the recommended number of containers to allocate for processing
the work units
+ */
@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
- // for simplicity, for now, consider only top-level work units (aka.
`MultiWorkUnit`s - MWUs)
+ if
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)) {
+ int initialContainerCount =
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1"));
+ log.info("Dynamic scaling is disabled, returning initial container
count: " + initialContainerCount);
+ return initialContainerCount;
+ }
+
+ long numWUs = remainingWork.getConstituentWorkUnitsCount();
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
- int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
- long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+ double totalBytes = numMWUs * meanBytesPerMWU;
Review Comment:
yes, both are the same.. Since we need to determine the count using
`Math.ceil` method which anyway requires a double value(otherwise it would be
counted down), just used `totalBytes` directly instead of casting
`getTotalSize` as double
Issue Time Tracking
-------------------
Worklog Id: (was: 962221)
Time Spent: 0.5h (was: 20m)
> Support dynamic container scaling on Temporal workload
> ------------------------------------------------------
>
> Key: GOBBLIN-2199
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2199
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Prateek Khandelwal
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> Currently, Gobblin runs static count of container(initial containers at the
> start of job). We need to support dynamic scaling by computing the
> recommended number of containers such that large data copy workloads can be
> processed within some completion time and without running into OOM errors on
> containers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)