phet commented on code in PR #4087:
URL: https://github.com/apache/gobblin/pull/4087#discussion_r1891105689
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -154,6 +198,35 @@ protected ProcessWorkUnitsWorkflow
createProcessWorkUnitsWorkflow(Properties job
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
}
+ protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime,
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
+ // TODO: make configurable! for now, aim for:
+ // - total job runtime of 2 hours
+ // - at least 15 minutes for the `CommitStepWorkflow`
+ // - leave at least 1 hour for the `ProcessWorkUnitsWorkflow` (so deduct
at most 45 minutes for WU generation thus far)
+ long totalTimeMins = 120;
+ long maxGenWUsMins = 45;
+ long commitStepMins = 15;
+ double permittedOveragePercentage = .2;
+ Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.getCurrentTime());
+ long remainingMins = totalTimeMins - Math.min(genWUsDuration.toMinutes(),
maxGenWUsMins) - commitStepMins;
+ return TimeBudget.withOveragePercentage(remainingMins,
permittedOveragePercentage);
+ }
+
+ protected List<ScalingDirective>
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+ // TODO: make any adjustments - e.g. decide whether to shutdown the (often
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
+ if (recommendedScalingDirectives.size() == 0) {
+ return recommendedScalingDirectives;
+ }
+ // TODO: be more robust and code more defensively, rather than presuming
the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl`
+ ArrayList<ScalingDirective> adjustedScaling = new
ArrayList<>(recommendedScalingDirectives);
+ ScalingDirective firstDirective = adjustedScaling.get(0);
+ // deduct one for the (already existing) `GenerateWorkUnits` worker
+ adjustedScaling.set(0,
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));
Review Comment:
updated to:
```
// deduct one for (already existing) `GenerateWorkUnits` worker (we presume
its "baseline" `WorkerProfile` similar enough to substitute for this new one)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]