[
https://issues.apache.org/jira/browse/GOBBLIN-2097?focusedWorklogId=924179&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-924179
]
ASF GitHub Bot logged work on GOBBLIN-2097:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/24 16:57
Start Date: 02/Jul/24 16:57
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3984:
URL: https://github.com/apache/gobblin/pull/3984#discussion_r1653314453
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -244,8 +246,10 @@ public static Properties
getJobPropertiesFromJobDetail(JobDetail jobDetail) {
}
/**
- * Updates the cronExpression, reminderTimestamp, originalEventTime values
in the properties map of a JobDataMap
- * provided returns the updated JobDataMap to the user
+ * Adds the cronExpression, reminderTimestamp, originalEventTime values in
the properties map of a new jobDataMap
+ * cloned from the one provided and returns the new JobDataMap to the user.
+ * Note: the jobDataMap and Properties field reference different objects
than the original, but the keys and values
Review Comment:
on this note, merely say:
```
`jobDataMap` and its `GobblinServiceJobScheduler.PROPERTIES_KEY` field are
shallow, not deep-copied
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -223,19 +224,20 @@ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
+ /* Cloning the original jobDetail creates a new object reference but the
jobDataMap and Properties fields are
+ shallow copies that need to be replaced by deep ones to avoid shared
references between the original and reminder
+ jobs in the scheduler
+ */
Review Comment:
recommend spreading this out... otherwise it's a sizeable chunk of abstract
and difficult-to-envision text. e.g. first, here:
```
// 1. shallow `.clone()` this top-level `JobDetailImpl`
```
then on L239:
```
// 2. create a fresh `JobDataMap` specific to the reminder
```
(aside: as mentioned before the name `updatePropsInJobDataMap` suggests it
will mutate the JDM. since, that's what it originally did, you probably named
it such. suggest changing to indicate it returns a copy and leaves the orig
unchanged.)
finally on L240:
```
// 3. update `clonedJobDetail` to point to the new `JobDataMap`
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -249,7 +249,7 @@ private void initializeConstantsTable() throws IOException {
@Override
public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean adoptConsensusFlowExecutionId) throws IOException {
- log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
+ log.info("Multi-active lease arbiter about to handle trigger event: [{},
is: {}, triggerEventTimestamp: {}]",
Review Comment:
nit: on L307, it's simply "Multi-active arbiter" ... be consistent
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -223,19 +224,20 @@ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
+ /* Cloning the original jobDetail creates a new object reference but the
jobDataMap and Properties fields are
+ shallow copies that need to be replaced by deep ones to avoid shared
references between the original and reminder
+ jobs in the scheduler
+ */
+ JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey).clone();
+ JobDataMap originalJobDataMap = jobDetail.getJobDataMap();
Review Comment:
just to keep things clear, I would recommend naming `jobDetail` to
`clonedJobDetail`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -302,11 +302,13 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject da
}
}
- log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
- + "with database eventTimestamp {} (in epoch-millis)",
dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
- dbCurrentTimestamp.getTime());
-
+ // TODO: check whether reminder event before replacing flowExecutionId
+ if (adoptConsensusFlowExecutionId) {
+ log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
+ + "with database eventTimestamp {} (in epoch-millis)",
dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
Review Comment:
this pattern of reaching into `dagActionLeaseObject` to fill in
```
"[{}, is: {}, triggerEventTimestamp: {}]"
```
recurs MANY times. looks like DALO needs a `toString()`.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java:
##########
@@ -67,17 +69,22 @@ public void testUpdatePropsInJobDataMap() {
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
"1");
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
- JobDataMap newJobDataMap =
FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
- schedulerBackOffMillis);
+ JobDataMap newJobDataMap =
+ FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap,
leasedToAnotherStatus, schedulerBackOffMillis);
Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertNotEquals("0",
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));
+
+ Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+ Assert.assertNotSame(originalProperties, newProperties);
+
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
Review Comment:
I like this assertion! of course it's not merely that old and new should be
different instances, but that old IS NOT modified.
don't spend too long, but would it be possible to use a `Mockito.spy` on the
oldJDM, to ensure only `clone()` and `get()` are called?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -223,19 +224,20 @@ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
+ /* Cloning the original jobDetail creates a new object reference but the
jobDataMap and Properties fields are
+ shallow copies that need to be replaced by deep ones to avoid shared
references between the original and reminder
+ jobs in the scheduler
+ */
+ JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey).clone();
+ JobDataMap originalJobDataMap = jobDetail.getJobDataMap();
Review Comment:
just to keep things clear, I similarly recommend naming `jobDetail` to
`clonedJobDetail`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -223,19 +224,20 @@ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
+ /* Cloning the original jobDetail creates a new object reference but the
jobDataMap and Properties fields are
+ shallow copies that need to be replaced by deep ones to avoid shared
references between the original and reminder
+ jobs in the scheduler
+ */
Review Comment:
recommend spreading this out... otherwise it's a sizeable chunk of abstract
and difficult-to-envision text. e.g. first, here:
```
// 1. shallow `.clone()` this top-level `JobDetailImpl`
```
then on L239:
```
// 2. create a fresh `JobDataMap` specific to the reminder
```
(aside: as mentioned earlier, the name `updatePropsInJobDataMap` suggests it
will mutate the JDM. since, that's what it originally did, you probably named
it such. suggest changing to indicate it returns a copy and leaves the orig
unchanged.)
finally on L240:
```
// 3. update `clonedJobDetail` to point to the new `JobDataMap`
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -223,19 +224,20 @@ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
+ /* Cloning the original jobDetail creates a new object reference but the
jobDataMap and Properties fields are
+ shallow copies that need to be replaced by deep ones to avoid shared
references between the original and reminder
+ jobs in the scheduler
+ */
Review Comment:
recommend spreading this out... otherwise it's a sizeable chunk of abstract
and difficult-to-envision text. e.g. first, here:
```
// 1. shallow `.clone()` this top-level `JobDetailImpl`
```
then on L239:
```
// 2. create a fresh `JobDataMap` specific to the reminder
```
(aside: as mentioned earlier, the name `updatePropsInJobDataMap` suggests it
will mutate the JDM. given it originally did, that would explain its name.
suggest changing to indicate it returns a copy and leaves the orig unchanged.)
finally on L240:
```
// 3. update `clonedJobDetail` to point to the new `JobDataMap`
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -249,7 +249,7 @@ private void initializeConstantsTable() throws IOException {
@Override
public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject dagActionLeaseObject,
boolean adoptConsensusFlowExecutionId) throws IOException {
- log.info("Multi-active scheduler about to handle trigger event: [{}, is:
{}, triggerEventTimestamp: {}]",
+ log.info("Multi-active lease arbiter about to handle trigger event: [{},
is: {}, triggerEventTimestamp: {}]",
Review Comment:
nit: on L307, it's simply "Multi-active arbiter"
let's be consistent
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -302,11 +302,13 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagActionLeaseObject da
}
}
- log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
- + "with database eventTimestamp {} (in epoch-millis)",
dagActionLeaseObject.getDagAction(),
- dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
- dbCurrentTimestamp.getTime());
-
+ // TODO: check whether reminder event before replacing flowExecutionId
+ if (adoptConsensusFlowExecutionId) {
+ log.info("Multi-active arbiter replacing local trigger event timestamp
[{}, is: {}, triggerEventTimestamp: {}] "
+ + "with database eventTimestamp {} (in epoch-millis)",
dagActionLeaseObject.getDagAction(),
+ dagActionLeaseObject.isReminder ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis(),
Review Comment:
this pattern of reaching into `dagActionLeaseObject` three times to fill in
```
"[{}, is: {}, triggerEventTimestamp: {}]"
```
recurs MANY times. looks like DALO needs a `toString()`.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java:
##########
@@ -67,17 +69,22 @@ public void testUpdatePropsInJobDataMap() {
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
"1");
oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
- JobDataMap newJobDataMap =
FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap, leasedToAnotherStatus,
- schedulerBackOffMillis);
+ JobDataMap newJobDataMap =
+ FlowLaunchHandler.updatePropsInJobDataMap(oldJobDataMap,
leasedToAnotherStatus, schedulerBackOffMillis);
Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
Assert.assertTrue(newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY).endsWith(cronExpressionSuffix));
Assert.assertNotEquals("0",
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY));
Assert.assertEquals(String.valueOf(leasedToAnotherStatus.getEventTimeMillis()),
newProperties.getProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY));
Assert.assertTrue(Boolean.parseBoolean(newProperties.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY)));
+
+ Assert.assertNotSame(oldJobDataMap, newJobDataMap);
+ Assert.assertNotSame(originalProperties, newProperties);
+
Assert.assertFalse(originalProperties.containsKey(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY));
Review Comment:
I like this assertion! of course, most fundamentally, it's not merely that
old and new should differ, but that old IS NOT modified.
w/o spending too long, I'd recommend putting a `Mockito.spy` on the oldJDM,
to ensure only `clone()` and `get()` are called
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -223,19 +224,20 @@ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnothe
* the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
* contain the cron scheduler for the reminder event and information about
the event to revisit
* @param originalKey
- * @param reminderKey
* @param status
* @return
* @throws SchedulerException
*/
- protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- LeaseAttemptStatus.LeasedToAnotherStatus status)
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
- jobDetail.setKey(reminderKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- jobDataMap = updatePropsInJobDataMap(jobDataMap, status,
schedulerMaxBackoffMillis);
- jobDetail.setJobDataMap(jobDataMap);
+ /* Cloning the original jobDetail creates a new object reference but the
jobDataMap and Properties fields are
+ shallow copies that need to be replaced by deep ones to avoid shared
references between the original and reminder
+ jobs in the scheduler
+ */
Review Comment:
recommend spreading this out... otherwise it's a sizeable chunk of abstract
and difficult-to-envision text. e.g. first, here:
```
// 1. shallow `.clone()` this top-level `JobDetailImpl`
```
then on L239:
```
// 2. create a fresh `JobDataMap` specific to the reminder
```
(aside: as mentioned earlier, the name `updatePropsInJobDataMap` suggests it
will mutate the JDM
Issue Time Tracking
-------------------
Worklog Id: (was: 924179)
Time Spent: 50m (was: 40m)
> Use unique JobDataMaps and Properties to use for reminder events
> ----------------------------------------------------------------
>
> Key: GOBBLIN-2097
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2097
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> We notice that after deploying the service all subsequent scheduler triggers
> after the very first one are incorrectly marked as reminders even when they
> are for "original" events. This is a result of re-using the same JobDataMap
> and Properties object between the original and reminder Jobs for the
> Scheduler. The following changes create deep copies of both to use for the
> reminders so adding a reminder flag in one will not affect the other.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)