khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000306389
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
/**
* Simple config-driven linear recommendation for how many containers to use
to complete the "remaining work" within a given {@link TimeBudget}, per:
*
- * a. from {@link WorkUnitsSizeSummary}, find how many (remaining)
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some
mean size
- * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit`
(MWU)
- * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- * d. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish
within the target number of minutes
+ * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link
org.apache.gobblin.source.workunit.WorkUnit}s
+ * b. from the configured
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ * c. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends
AbstractRecommendScalingForWorkUnitsImpl {
- public static final String AMORTIZED_NUM_BYTES_PER_MINUTE =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX +
"heuristic.params.numBytesPerMinute";
- public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L
* 1000L * 60L; // 80MB/sec
-
+ /**
+ * Calculates the recommended number of containers for processing the
remaining work units.
+ * <p>
+ * This method first checks whether dynamic scaling is enabled via the job
state configuration.
+ * If dynamic scaling is disabled, it returns the initial container count as
specified in the job state.
+ * When dynamic scaling is enabled, it computes the throughput based on the
count of constituent work units (WUs)
+ * and the processing rate (bytes per minute per thread). The calculation
involves:
+ * <ol>
+ * <li>Computing the total bytes to be processed based on the count and
mean size of top-level work units.</li>
+ * <li>Calculating the processing rate per container using the amortized
bytes per minute rate and the container's work unit capacity.</li>
+ * <li>Estimating the total container-minutes required to process all MWUs
and determining the number of containers needed
+ * to meet the job time budget.</li>
+ * <li>Computing an alternative container count based on the maximum
number of work units allowed per container.</li>
+ * <li>Returning the maximum of the two computed container counts as the
recommended scaling.</li>
+ * </ol>
+ * </p>
+ *
+ * @param remainingWork the summary of work unit sizes and counts remaining
for processing
+ * @param sourceClass the name of the class invoking this method
+ * @param jobTimeBudget the time budget allocated for the job execution
+ * @param jobState the current job state that holds configuration properties
and runtime parameters
+ * @return the recommended number of containers to allocate for processing
the work units
+ */
@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
- // for simplicity, for now, consider only top-level work units (aka.
`MultiWorkUnit`s - MWUs)
+ if
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)) {
+ int initialContainerCount =
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1"));
+ log.info("Dynamic scaling is disabled, returning initial container
count: " + initialContainerCount);
+ return initialContainerCount;
+ }
+
+ long numWUs = remainingWork.getConstituentWorkUnitsCount();
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
- int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
- long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+ double totalBytes = numMWUs * meanBytesPerMWU;
Review Comment:
yes, both are the same.. Since we need to determine the count using
`Math.ceil` method which anyway requires a double value(otherwise it would be
counted down), just used `totalBytes` directly instead of casting
`getTotalSize` as double
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]