umustafi commented on code in PR #3927:
URL: https://github.com/apache/gobblin/pull/3927#discussion_r1571389194
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -177,7 +177,7 @@ private DagTask createDagTask(DagActionStore.DagAction
dagAction, LeaseAttemptSt
*/
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
-
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
+
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getLaunderedDagAction(),
Review Comment:
This renaming is helpful to make the laundering more explicit
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -99,51 +104,47 @@ public FlowLaunchHandler(Config config,
* This method is used in the multi-active scheduler case for one or more
hosts to respond to a launch dag action
* event triggered by the scheduler by attempting a lease for the launch
event and processing the result depending on
* the status of the attempt.
- * @param jobProps
- * @param dagAction
- * @param eventTimeMillis
- * @param isReminderEvent
- * @param skipFlowExecutionIdReplacement
- * @throws IOException
*/
public void handleFlowLaunchTriggerEvent(Properties jobProps,
DagActionStore.DagAction dagAction,
long eventTimeMillis, boolean isReminderEvent, boolean
skipFlowExecutionIdReplacement) throws IOException {
- LeaseAttemptStatus
- leaseAttemptStatus = this.multiActiveLeaseArbiter
- .tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
- if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
- LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
- (LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
- if (persistDagAction(leaseObtainedStatus)) {
- log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseObtainedStatus.getDagAction(),
- leaseObtainedStatus.getEventTimeMillis());
- return;
- }
- // If persisting the dag action failed, then we set another trigger
for this event to occur immediately to
- // re-attempt handling the event
- scheduleReminderForEvent(jobProps,
- new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(),
0L), eventTimeMillis);
- } else if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) {
- scheduleReminderForEvent(jobProps,
(LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
- eventTimeMillis);
- }
- // Otherwise leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
+ // TOOD - (needs correcting?) semantically, isn't -
`skipFlowExecutionIdReplacement == !adoptConsensusFlowExecutionId`?!?!?
Review Comment:
let's keep it consistent so we don't accidentally put the wrong value -> we
can keep `adoptConsensusFlowExecutionId` across the board.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]