Blazer-007 commented on code in PR #4087:
URL: https://github.com/apache/gobblin/pull/4087#discussion_r1890251450


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -154,6 +198,35 @@ protected ProcessWorkUnitsWorkflow 
createProcessWorkUnitsWorkflow(Properties job
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }
 
+  protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, 
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
+    // TODO: make configurable!  for now, aim for:
+    //  - total job runtime of 2 hours
+    //  - at least 15 minutes for the `CommitStepWorkflow`
+    //  - leave at least 1 hour for the `ProcessWorkUnitsWorkflow` (so deduct 
at most 45 minutes for WU generation thus far)
+    long totalTimeMins = 120;
+    long maxGenWUsMins = 45;
+    long commitStepMins = 15;
+    double permittedOveragePercentage = .2;
+    Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.getCurrentTime());
+    long remainingMins = totalTimeMins - Math.min(genWUsDuration.toMinutes(), 
maxGenWUsMins) - commitStepMins;
+    return TimeBudget.withOveragePercentage(remainingMins, 
permittedOveragePercentage);
+  }
+
+  protected List<ScalingDirective> 
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+    // TODO: make any adjustments - e.g. decide whether to shutdown the (often 
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
+    if (recommendedScalingDirectives.size() == 0) {
+      return recommendedScalingDirectives;
+    }
+    // TODO: be more robust and code more defensively, rather than presuming 
the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl`
+    ArrayList<ScalingDirective> adjustedScaling = new 
ArrayList<>(recommendedScalingDirectives);
+    ScalingDirective firstDirective = adjustedScaling.get(0);
+    // deduct one for the (already existing) `GenerateWorkUnits` worker
+    adjustedScaling.set(0, 
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));

Review Comment:
   maybe worth adding one more line comment here something similar that this 
deduction is also based on assumption that firstDirective configs (memory size) 
are exactly equal to initial configs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to