phet commented on code in PR #4087:
URL: https://github.com/apache/gobblin/pull/4087#discussion_r1891105689


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -154,6 +198,35 @@ protected ProcessWorkUnitsWorkflow 
createProcessWorkUnitsWorkflow(Properties job
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }
 
+  protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, 
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
+    // TODO: make configurable!  for now, aim for:
+    //  - total job runtime of 2 hours
+    //  - at least 15 minutes for the `CommitStepWorkflow`
+    //  - leave at least 1 hour for the `ProcessWorkUnitsWorkflow` (so deduct 
at most 45 minutes for WU generation thus far)
+    long totalTimeMins = 120;
+    long maxGenWUsMins = 45;
+    long commitStepMins = 15;
+    double permittedOveragePercentage = .2;
+    Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.getCurrentTime());
+    long remainingMins = totalTimeMins - Math.min(genWUsDuration.toMinutes(), 
maxGenWUsMins) - commitStepMins;
+    return TimeBudget.withOveragePercentage(remainingMins, 
permittedOveragePercentage);
+  }
+
+  protected List<ScalingDirective> 
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+    // TODO: make any adjustments - e.g. decide whether to shutdown the (often 
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
+    if (recommendedScalingDirectives.size() == 0) {
+      return recommendedScalingDirectives;
+    }
+    // TODO: be more robust and code more defensively, rather than presuming 
the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl`
+    ArrayList<ScalingDirective> adjustedScaling = new 
ArrayList<>(recommendedScalingDirectives);
+    ScalingDirective firstDirective = adjustedScaling.get(0);
+    // deduct one for the (already existing) `GenerateWorkUnits` worker
+    adjustedScaling.set(0, 
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));

Review Comment:
   updated to:
   ```
   // deduct one for (already existing) `GenerateWorkUnits` worker (we presume 
its "baseline" `WorkerProfile` similar enough to substitute for this new one)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to