umustafi commented on code in PR #4015:
URL: https://github.com/apache/gobblin/pull/4015#discussion_r1704419098


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -120,6 +120,24 @@ && 
persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)
     }
   }
 
+  /**
+   * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a kill dag action
+   * event triggered by the Orchestrator by attempting a lease for the kill 
event and processing the result depending on
+   * the status of the attempt.
+   */
+  public void handleFlowKillTriggerEvent(Properties jobProps, 
DagActionStore.LeaseParams leaseParams) throws IOException {
+    long previousEventTimeMillis = leaseParams.getEventTimeMillis();
+    LeaseAttemptStatus leaseAttempt = 
this.multiActiveLeaseArbiter.tryAcquireLease(leaseParams, false);
+    if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
+        && persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt)) {
+      log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseAttempt.getConsensusDagAction(),
+          previousEventTimeMillis);
+    } else { // when NOT successfully `persistDagAction`, set a reminder to 
re-attempt handling (unless leasing finished)
+      
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
+          scheduleReminderForEvent(jobProps, leasedToAnother, 
previousEventTimeMillis));
+    }

Review Comment:
   can u extract common functionality with the event avoce for 
`handleFlowLaunchTriggerEvent`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to