[
https://issues.apache.org/jira/browse/GOBBLIN-1921?focusedWorklogId=882977&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882977
]
ASF GitHub Bot logged work on GOBBLIN-1921:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Oct/23 19:16
Start Date: 02/Oct/23 19:16
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3790:
URL: https://github.com/apache/gobblin/pull/3790#discussion_r1343049643
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -264,9 +265,9 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
String flowExecutionId =
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
DagActionStore.DagAction flowAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
- flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis);
- _log.info("Multi-active scheduler finished handling trigger event:
[{}, triggerEventTimestamp: {}]", flowAction,
- triggerTimestampMillis);
+ flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent);
+ _log.info("Multi-active scheduler finished handling trigger event:
[{}, triggerEventTimestamp: {}]" +
+ (isReminderEvent ? " (reminderEvent)" : ""), flowAction,
triggerTimestampMillis);
Review Comment:
nit: I prefer something shown for both cases -
```
+ "(" + (isReminderEvent ? "IS" : "NOT") " reminder)"
```
or
```
+ "(is " + (isReminderEvent ? "reminder" : "original") + ")"
```
##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -141,14 +142,14 @@ public void testAcquireLeaseSingleParticipant() throws
Exception {
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimestamp() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Assert.assertTrue(fifthLaunchStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
Thread.sleep(EPSILON * 3/2);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
- mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis);
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false);
Review Comment:
here also: don't we want to test the case of reminders trying (again) to
acquire? (it's OK to defer, but at least leave a TODO)
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -341,13 +341,13 @@ public void doNotRegisterMetricsAdhocFlows() throws
Exception {
flowProps.put("gobblin.flow.destinationIdentifier", "destination");
flowProps.put("flow.allowConcurrentExecution", false);
FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
+ this.orchestrator.orchestrate(adhocSpec, flowProps, 0, false);
String metricName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0",
"flow0", ServiceMetricNames.COMPILED);
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
flowProps.setProperty("job.schedule", "0/2 * * * * ?");
FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
+ this.orchestrator.orchestrate(scheduledSpec, flowProps, 0, false);
Review Comment:
shall we add an additional case where the reminder is true?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -210,8 +242,22 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
int dbLinger = getResult.get().getDbLinger();
Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
+ // For reminder event, we can stop early if the reminder eventTimeMillis
is older than the current event in the db
+ // because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
+ if (isReminderEvent) {
+ if (eventTimeMillis < dbEventTimestamp.getTime()) {
+ log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event
trigger is being worked on, so this "
+ + "older reminder will be dropped.");
+ return new NoLongerLeasingStatus();
+ } if (eventTimeMillis > dbEventTimestamp.getTime()) {
+ log.warn("tryAcquireLease for [{}, eventTimestamp: {}] - Severe
constraint violation encountered: a reminder "
Review Comment:
same with these `{}`s
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -186,12 +218,12 @@ private void initializeConstantsTable() throws
IOException {
}
@Override
- public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
- throws IOException {
- log.info("Multi-active scheduler about to handle trigger event: [{},
triggerEventTimestamp: {}]", flowAction,
- eventTimeMillis);
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis,
+ boolean isReminderEvent) throws IOException {
+ log.info("Multi-active scheduler about to handle trigger event: [{},
triggerEventTimestamp: {}]" +
Review Comment:
same comment as in `Orchestrator` logging
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -278,6 +280,8 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap
jobDataMap,
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
+ // Use this boolean to indicate whether this is a reminder event
+ prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY,
String.valueOf(true));
Review Comment:
please document in comment why defaulting to `true` is preferable. In
isolation, I'd naively imagine `false` to be what we want, but if the dynamic
code path to get here should presume `true`, just highlight the reasoning
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -210,8 +242,22 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
int dbLinger = getResult.get().getDbLinger();
Timestamp dbCurrentTimestamp = getResult.get().getDbCurrentTimestamp();
+ // For reminder event, we can stop early if the reminder eventTimeMillis
is older than the current event in the db
+ // because db laundering tells us that the currently worked on db event
is newer and will have its own reminders
+ if (isReminderEvent) {
+ if (eventTimeMillis < dbEventTimestamp.getTime()) {
+ log.info("tryAcquireLease for [{}, eventTimestamp: {}] - A new event
trigger is being worked on, so this "
Review Comment:
can we log the DB timestamp that we've based this decision on? anyway, what
do the two `{}` have bound as their args?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -104,32 +116,50 @@ protected interface CheckedFunction<T, R> {
+ "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon),
linger=VALUES(linger)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW =
WHERE_CLAUSE_TO_MATCH_KEY
- + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
- protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
- + "linger FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
+ + " AND event_timestamp=CONVERT_TZ(?, '+00:00', @@session.time_zone)"
+ + " AND lease_acquisition_timestamp=CONVERT_TZ(?, '+00:00',
@@session.time_zone)";
Review Comment:
don't we prefer to store and retrieve always as UTC? if so, is the issue
that `DEFAULT CURRENT_TIMESTAMP(3)` is what first sets us on to using the
session TZ? alternatively could we convert it to be UTC when it's initialized
that way? if we could, there wouldn't be a need for conversion here, would
there be?
e.g. do we want everywhere want `UTC_TIMESTAMP(3)`, rather than
`CURRENT_TIMESTAMP(3)`?
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -109,6 +109,7 @@ public class ConfigurationKeys {
// Event time of flow action to orchestrate using the multi-active lease
arbiter
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY =
"orchestratorTriggerEventTimeMillis";
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL =
"-1";
+ public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
Review Comment:
fine for now, since already precedent... but for the longer-term, I wonder
whether most of these constants don't better belong in
[`ServiceConfigKeys`](https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java#L25)...
what do you think?
Issue Time Tracking
-------------------
Worklog Id: (was: 882977)
Time Spent: 20m (was: 10m)
> Properly handle reminder events
> -------------------------------
>
> Key: GOBBLIN-1921
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1921
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Reminder flow trigger events were being improperly handled and interpreted as
> new events because they are triggered {{linger}} time after the original
> trigger where {{epsilon < linger}} and we use {{epsilon}} to determine event
> distinctness. With reminder events being considered distinct events, we were
> launching excess concurrent flows that were then being cancelled. Now we
> handle reminder events differently from normal event triggers to ensure
> they're properly evaluated. Because of db laundering, reminder events are
> easy to handle - if they're older than the currently worked upon event in the
> database they can be skipped and if they're equal to the current event in the
> database they are handled like normal. Reminder events should never be newer
> than the current event in the lease arbiter table because db laundering
> always results in increasing event times.Â
--
This message was sent by Atlassian Jira
(v8.20.10#820010)