Will-Lo commented on code in PR #3767:
URL: https://github.com/apache/gobblin/pull/3767#discussion_r1323478219
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -728,38 +728,42 @@ public static class GobblinServiceJob extends
BaseGobblinJob implements Interrup
@Override
public void executeImpl(JobExecutionContext context) throws
JobExecutionException {
- JobDetail jobDetail = context.getJobDetail();
- _log.info("Starting FlowSpec " + jobDetail.getKey());
-
- JobDataMap dataMap = jobDetail.getJobDataMap();
- JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
- Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
- JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
-
- // Obtain trigger timestamp from trigger to pass to jobProps
- Trigger trigger = context.getTrigger();
- // THIS current event has already fired if this method is called, so it
now exists in <previousFireTime>
- long triggerTimeMillis = asUTCEpochMillis(trigger.getPreviousFireTime());
- // If the trigger is a reminder type event then utilize the trigger time
saved in job properties rather than the
- // actual firing time
- if (jobDetail.getKey().getName().contains("reminder")) {
- String preservedConsensusEventTime = jobProps.getProperty(
-
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
- String expectedReminderTime = jobProps.getProperty(
- ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
"0");
- _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {}
expectedReminderTime: {} - Reminder job "
- + "triggered by scheduler at {}", preservedConsensusEventTime,
expectedReminderTime, triggerTimeMillis);
- // TODO: add a metric if expected reminder time far exceeds system time
-
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
preservedConsensusEventTime);
- } else {
-
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
- String.valueOf(triggerTimeMillis));
- _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {}
nextTriggerTime: {} - Job triggered by "
- + "scheduler", triggerTimeMillis,
asUTCEpochMillis(trigger.getNextFireTime()));
- }
try {
+ // TODO: move this out of the try clause after location NPE source
+ JobDetail jobDetail = context.getJobDetail();
+ _log.info("Starting FlowSpec " + jobDetail.getKey());
+
+ JobDataMap dataMap = jobDetail.getJobDataMap();
+ JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
+ Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
+ JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+
+ // Obtain trigger timestamp from trigger to pass to jobProps
+ Trigger trigger = context.getTrigger();
+ // THIS current event has already fired if this method is called, so
it now exists in <previousFireTime>
+ long triggerTimeMillis =
asUTCEpochMillis(trigger.getPreviousFireTime());
+ // If the trigger is a reminder type event then utilize the trigger
time saved in job properties rather than the
+ // actual firing time
+ if (jobDetail.getKey().getName().contains("reminder")) {
+ String preservedConsensusEventTime = jobProps.getProperty(
+
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
+ String expectedReminderTime = jobProps.getProperty(
+ ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
"0");
+ _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime:
{} expectedReminderTime: {} - Reminder job "
+ + "triggered by scheduler at {}", preservedConsensusEventTime,
expectedReminderTime, triggerTimeMillis);
+ // TODO: add a metric if expected reminder time far exceeds system
time
+
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
preservedConsensusEventTime);
+ } else {
+
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
+ String.valueOf(triggerTimeMillis));
+ _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime:
{} nextTriggerTime: {} - Job triggered by "
+ + "scheduler", triggerTimeMillis,
asUTCEpochMillis(trigger.getNextFireTime()));
+ }
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
+ if (t instanceof NullPointerException) {
+ log.warn("NullPointerException encountered while trying to execute
flow: " + t.getMessage());
Review Comment:
You should probably use t.printStackTrace() rather than getMessage, or add
them together. Since getMessage may not give you much information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]