[
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=882995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882995
]
ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/23 21:08
Start Date: 02/Oct/23 21:08
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343147474
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long
delayPeriodMillis) {
Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
}
+
+ /**
+ * Attempts to acquire lease for a given {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+ * through lease arbitration and if it fails, it will create and schedule a
reminder trigger to check back again.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @return optionally leaseObtainedStatus if acquired; otherwise schedule
reminder to check back again.
+ * @throws IOException
+ */
+ public MultiActiveLeaseArbiter.LeaseAttemptStatus
getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction,
long eventTimeMillis) throws IOException {
+
+ if (multiActiveLeaseArbiter.isPresent()) {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ this.leaseObtainedCount.inc();
+ log.info("Successfully acquired lease for dag action: {}", flowAction);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ this.leasedToAnotherStatusCount.inc();
+ scheduleReminderForEvent(jobProps,
+ (MultiActiveLeaseArbiter.LeasedToAnotherStatus)
leaseAttemptStatus, eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ this.noLongerLeasingStatusCount.inc();
+ log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp:
{}] ", leaseAttemptStatus.getClass().getName(),
+ eventTimeMillis);
+ }
+ return leaseAttemptStatus;
+ } else {
+ throw new RuntimeException(String.format("Multi-active scheduler is not
enabled so trigger event should not be "
+ + "handled with this method."));
+ }
+ }
+
+ public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() {
+ return this.multiActiveLeaseArbiter.get();
+ }
Review Comment:
no need for this, if you indeed modify the existing `handleTriggerEvent` on
line 109 for reuse
Issue Time Tracking
-------------------
Worklog Id: (was: 882995)
Time Spent: 12h 50m (was: 12h 40m)
> Refactor code to move current in-memory references to new design for REST
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1910
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 12h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)