[ 
https://issues.apache.org/jira/browse/GOBBLIN-1921?focusedWorklogId=882977&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882977
 ]

ASF GitHub Bot logged work on GOBBLIN-1921:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/23 19:16
            Start Date: 02/Oct/23 19:16
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3790:
URL: https://github.com/apache/gobblin/pull/3790#discussion_r1343049643


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -264,9 +265,9 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
         String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
         DagActionStore.DagAction flowAction =
             new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
-        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis);
-        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]", flowAction,
-            triggerTimestampMillis);
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis, isReminderEvent);
+        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, triggerEventTimestamp: {}]" +
+                (isReminderEvent ? " (reminderEvent)" : ""), flowAction, 
triggerTimestampMillis);

Review Comment:
   nit: I prefer something shown for both cases -
   ```
   + "(" + (isReminderEvent ? "IS" : "NOT") " reminder)"
   ```
   or
   ```
   + "(is " + (isReminderEvent ? "reminder" : "original") + ")"
   ```



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -141,14 +142,14 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
     
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
     Assert.assertTrue(System.currentTimeMillis() - 
fourthObtainedStatus.getEventTimestamp() < EPSILON);
     MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);
     Assert.assertTrue(fifthLaunchStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
 
     // Tests CASE 6 of no longer leasing a distinct event in DB
     // Wait so this event is considered distinct and a new lease will be 
acquired
     Thread.sleep(EPSILON * 3/2);
     MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false);

Review Comment:
   here also: don't we want to test the case of reminders trying (again) to 
acquire?  (it's OK to defer, but at least leave a TODO)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -341,13 +341,13 @@ public void doNotRegisterMetricsAdhocFlows() throws 
Exception {
     flowProps.put("gobblin.flow.destinationIdentifier", "destination");
     flowProps.put("flow.allowConcurrentExecution", false);
     FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
+    this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
     String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", 
"flow0", ServiceMetricNames.COMPILED);
     
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
 
     flowProps.setProperty("job.schedule", "0/2 * * * * ?");
     FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
+    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);

Review Comment:
   shall we add an additional case where the reminder is true?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -210,8 +242,22 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       int dbLinger = getResult.get().getDbLinger();
       Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
 
+      // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
+      // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
+      if (isReminderEvent) {
+        if (eventTimeMillis < dbEventTimestamp.getTime()) {
+          log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event 
trigger is being worked on, so this "
+              + "older reminder will be dropped.");
+          return new NoLongerLeasingStatus();
+        } if (eventTimeMillis > dbEventTimestamp.getTime()) {
+          log.warn("tryAcquireLease for [{}, eventTimestamp: {}] - Severe 
constraint violation encountered: a reminder "

Review Comment:
   same with these `{}`s



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -186,12 +218,12 @@ private void initializeConstantsTable() throws 
IOException {
   }
 
   @Override
-  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
-      throws IOException {
-    log.info("Multi-active scheduler about to handle trigger event: [{}, 
triggerEventTimestamp: {}]", flowAction,
-        eventTimeMillis);
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis,
+      boolean isReminderEvent) throws IOException {
+    log.info("Multi-active scheduler about to handle trigger event: [{}, 
triggerEventTimestamp: {}]" +

Review Comment:
   same comment as in `Orchestrator` logging



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -278,6 +280,8 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap 
jobDataMap,
     // excess flows to be triggered by the reminder functionality.
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
         String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
+    // Use this boolean to indicate whether this is a reminder event
+    prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, 
String.valueOf(true));

Review Comment:
   please document in comment why defaulting to `true` is preferable.  In 
isolation, I'd naively imagine `false` to be what we want, but if the dynamic 
code path to get here should presume `true`, just highlight the reasoning



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -210,8 +242,22 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       int dbLinger = getResult.get().getDbLinger();
       Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
 
+      // For reminder event, we can stop early if the reminder eventTimeMillis 
is older than the current event in the db
+      // because db laundering tells us that the currently worked on db event 
is newer and will have its own reminders
+      if (isReminderEvent) {
+        if (eventTimeMillis < dbEventTimestamp.getTime()) {
+          log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event 
trigger is being worked on, so this "

Review Comment:
   can we log the DB timestamp that we've based this decision on?  anyway, what 
do the two `{}` have bound as their args?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -104,32 +116,50 @@ protected interface CheckedFunction<T, R> {
       + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), 
linger=VALUES(linger)";
   protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_action=?";
   protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
-      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
-  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
-    + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+      + " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)"
+      + " AND lease_acquisition_timestamp=CONVERT_TZ(?, '+00:00', 
@@session.time_zone)";

Review Comment:
   don't we prefer to store and retrieve always as UTC?  if so, is the issue 
that `DEFAULT CURRENT_TIMESTAMP(3)` is what first sets us on to using the 
session TZ?  alternatively could we convert it to be UTC when it's initialized 
that way?  if we could, there wouldn't be a need for conversion here, would 
there be?
   
   e.g. do we want everywhere want `UTC_TIMESTAMP(3)`, rather than 
`CURRENT_TIMESTAMP(3)`?



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -109,6 +109,7 @@ public class ConfigurationKeys {
   // Event time of flow action to orchestrate using the multi-active lease 
arbiter
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = 
"orchestratorTriggerEventTimeMillis";
   public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = 
"-1";
+  public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";

Review Comment:
   fine for now, since already precedent... but for the longer-term, I wonder 
whether most of these constants don't better belong in 
[`ServiceConfigKeys`](https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java#L25)...
 what do you think?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 882977)
    Time Spent: 20m  (was: 10m)

> Properly handle reminder events
> -------------------------------
>
>                 Key: GOBBLIN-1921
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1921
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Reminder flow trigger events were being improperly handled and interpreted as 
> new events because they are triggered {{linger}} time after the original 
> trigger where {{epsilon < linger}} and we use {{epsilon}} to determine event 
> distinctness. With reminder events being considered distinct events, we were 
> launching excess concurrent flows that were then being cancelled. Now we 
> handle reminder events differently from normal event triggers to ensure 
> they're properly evaluated. Because of db laundering, reminder events are 
> easy to handle - if they're older than the currently worked upon event in the 
> database they can be skipped and if they're equal to the current event in the 
> database they are handled like normal. Reminder events should never be newer 
> than the current event in the lease arbiter table because db laundering 
> always results in increasing event times. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to