umustafi commented on code in PR #3885:
URL: https://github.com/apache/gobblin/pull/3885#discussion_r1513390943
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/GeneralLeaseArbitrationHandler.java:
##########
@@ -0,0 +1,120 @@
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.quartz.SchedulerException;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareCounter;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@Slf4j
+public abstract class GeneralLeaseArbitrationHandler {
+ protected Optional<MultiActiveLeaseArbiter> multiActiveLeaseArbiter;
+ protected Optional<DagActionStore> dagActionStore;
+ protected MetricContext metricContext;
+ private ContextAwareCounter leaseObtainedCount;
+
+ private ContextAwareCounter leasedToAnotherStatusCount;
+
+ private ContextAwareCounter noLongerLeasingStatusCount;
+ private ContextAwareMeter leasesObtainedDueToReminderCount;
+
+ public GeneralLeaseArbitrationHandler(Config config,
Optional<MultiActiveLeaseArbiter> leaseDeterminationStore,
+ Optional<DagActionStore> dagActionStore, String metricsPrefix) {
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ initializeMetrics(metricsPrefix);
+ }
+
+ private void initializeMetrics(String metricsPrefix) {
+ // If a valid metrics prefix is provided then add a delimiter after it
+ if (metricsPrefix != "") {
+ metricsPrefix += ".";
+ }
+ this.leaseObtainedCount =
this.metricContext.contextAwareCounter(metricsPrefix +
ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASE_OBTAINED_COUNT);
+ this.leasedToAnotherStatusCount =
this.metricContext.contextAwareCounter(metricsPrefix +
ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASED_TO_ANOTHER_COUNT);
+ this.noLongerLeasingStatusCount =
this.metricContext.contextAwareCounter(metricsPrefix +
ServiceMetricNames.FLOW_TRIGGER_HANDLER_NO_LONGER_LEASING_COUNT);
+ this.leasesObtainedDueToReminderCount =
this.metricContext.contextAwareMeter(metricsPrefix +
ServiceMetricNames.FLOW_TRIGGER_HANDLER_LEASES_OBTAINED_DUE_TO_REMINDER_COUNT);
+ }
+
+ /**
+ * This method is used by the multi-active scheduler and multi-active
execution classes (DagTaskStream) to attempt a
+ * lease for a particular job event and return the status of the attempt.
+ * @param flowAction
+ * @param eventTimeMillis
+ * @param isReminderEvent
+ * @param skipFlowExecutionIdReplacement
+ * @return
+ */
+ public MultiActiveLeaseArbiter.LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis,
+ boolean isReminderEvent, boolean skipFlowExecutionIdReplacement)
+ throws IOException, SchedulerException {
+ if (multiActiveLeaseArbiter.isPresent()) {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(
+ flowAction, eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
+ // The flow action contained in the`LeaseAttemptStatus` from the lease
arbiter contains an updated flow execution
+ // id. From this point onwards, always use the newer version of the
flow action to easily track the action through
+ // orchestration and execution.
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ if (isReminderEvent) {
+ this.leasesObtainedDueToReminderCount.mark();
+ }
+ this.leaseObtainedCount.inc();
+ return leaseAttemptStatus;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ this.leasedToAnotherStatusCount.inc();
+
scheduleReminderForEvent((MultiActiveLeaseArbiter.LeasedToAnotherStatus)
leaseAttemptStatus, eventTimeMillis);
+ return leaseAttemptStatus;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ this.noLongerLeasingStatusCount.inc();
+ log.debug("Received type of leaseAttemptStatus: [{}, eventTimestamp:
{}] ", leaseAttemptStatus.getClass().getName(),
+ eventTimeMillis);
+ return leaseAttemptStatus;
+ }
+ throw new RuntimeException(String.format("Received type of
leaseAttemptStatus: %s not handled by this method",
+ leaseAttemptStatus.getClass().getName()));
+ } else {
+ throw new RuntimeException(String.format("Multi-active mode is not
enabled so dag action event should not be "
+ + "handled with this method."));
+ }
+ }
+
+ /**
+ * To be called by a lease owner of a dagAction to mark the action as done.
+ * @param leaseStatus
+ */
+ public boolean
recordSuccessfulCompletion(MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseStatus) {
+ try {
+ return multiActiveLeaseArbiter.get().recordLeaseSuccess(leaseStatus);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // TODO: decide if scheduleReminderForEvent is a useful method that will
be used by FlowTriggerHandler or only DagProc...
+ /**
+ * This method is used by the callers of the lease arbitration attempts
over a dag action event to schedule a
+ * self-reminder to check on the other participant's progress to finish
acting on a dag action after the time the
+ * lease should expire.
+ * @param leaseStatus
+ * @param triggerEventTimeMillis
+ */
+ private void
scheduleReminderForEvent(MultiActiveLeaseArbiter.LeasedToAnotherStatus
leaseStatus, long triggerEventTimeMillis)
Review Comment:
make abstract method, remove not implemented. protected
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]