phet commented on code in PR #3790:
URL: https://github.com/apache/gobblin/pull/3790#discussion_r1343049643


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -264,9 +265,9 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
         String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
         DagActionStore.DagAction flowAction =
             new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
-        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis);
-        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]", flowAction,
-            triggerTimestampMillis);
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
+        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]" +
+                (isReminderEvent ? " (reminderEvent)" : ""), flowAction, 
triggerTimestampMillis);

Review Comment:
   nit: I prefer something shown for both cases -
   ```
   + "(" + (isReminderEvent ? "IS" : "NOT") " reminder)"
   ```
   or
   ```
   + "(is " + (isReminderEvent ? "reminder" : "original") + ")"
   ```



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -141,14 +142,14 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimestamp() < EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(fifthLaunchStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(EPSILON * 3/2);
     MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);

Review Comment:
   here also: don't we want to test the case of reminders trying (again) to 
acquire?  (it's OK to defer, but at least leave a TODO)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -341,13 +341,13 @@ public void doNotRegisterMetricsAdhocFlows() throws 
Exception {
     flowProps.put("gobblin.flow.destinationIdentifier", "destination");
     flowProps.put("flow.allowConcurrentExecution", false);
     FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
+    this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
     String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", 
"flow0", ServiceMetricNames.COMPILED);
     
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
 
     flowProps.setProperty("job.schedule", "0/2 * * * * ?");
     FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
+    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);

Review Comment:
   shall we add an additional case where the reminder is true?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -210,8 +242,22 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       int dbLinger = getResult.get().getDbLinger();
       Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
 
+      // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
+      // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
+      if (isReminderEvent) {
+        if (eventTimeMillis < dbEventTimestamp.getTime()) {
+          log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event 
trigger is being worked on, so this "
+              + "older reminder will be dropped.");
+          return new NoLongerLeasingStatus();
+        } if (eventTimeMillis > dbEventTimestamp.getTime()) {
+          log.warn("tryAcquireLease for [{}, eventTimestamp: {}] - Severe 
constraint violation encountered: a reminder "

Review Comment:
   same with these `{}`s



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -186,12 +218,12 @@ private void initializeConstantsTable() throws 
IOException {
   }
 
   @Override
-  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
-      throws IOException {
-    log.info("Multi-active scheduler about to handle trigger event: [{}, 
triggerEventTimestamp: {}]", flowAction,
-        eventTimeMillis);
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis,
+      boolean isReminderEvent) throws IOException {
+    log.info("Multi-active scheduler about to handle trigger event: [{}, 
triggerEventTimestamp: {}]" +

Review Comment:
   same comment as in `Orchestrator` logging



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -278,6 +280,8 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap 
jobDataMap,
     // excess flows to be triggered by the reminder functionality.
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
         String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
+    // Use this boolean to indicate whether this is a reminder event
+    prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
String.valueOf(true));

Review Comment:
   please document in comment why defaulting to `true` is preferable.  In 
isolation, I'd naively imagine `false` to be what we want, but if the dynamic 
code path to get here should presume `true`, just highlight the reasoning



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -210,8 +242,22 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       int dbLinger = getResult.get().getDbLinger();
       Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
 
+      // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
+      // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
+      if (isReminderEvent) {
+        if (eventTimeMillis < dbEventTimestamp.getTime()) {
+          log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event 
trigger is being worked on, so this "

Review Comment:
   can we log the DB timestamp that we've based this decision on?  anyway, what 
do the two `{}` have bound as their args?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -104,32 +116,50 @@ protected interface CheckedFunction<T, R> {
       + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), 
linger=VALUES(linger)";
   protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_action=?";
   protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
-      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
-  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
-    + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+      + " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)"
+      + " AND lease_acquisition_timestamp=CONVERT_TZ(?, '+00:00', 
@@session.time_zone)";

Review Comment:
   don't we prefer to store and retrieve always as UTC?  if so, is the issue 
that `DEFAULT CURRENT_TIMESTAMP(3)` is what first sets us on to using the 
session TZ?  alternatively could we convert it to be UTC when it's initialized 
that way?  if we could, there wouldn't be a need for conversion here, would 
there be?
   
   e.g. do we want everywhere want `UTC_TIMESTAMP(3)`, rather than 
`CURRENT_TIMESTAMP(3)`?



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -109,6 +109,7 @@ public class ConfigurationKeys {
   // Event time of flow action to orchestrate using the multi-active lease 
arbiter
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = 
"orchestratorTriggerEventTimeMillis";
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = 
"-1";
+  public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";

Review Comment:
   fine for now, since already precedent... but for the longer-term, I wonder 
whether most of these constants don't better belong in 
[`ServiceConfigKeys`](https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java#L25)...
 what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to