phet commented on code in PR #3899:
URL: https://github.com/apache/gobblin/pull/3899#discussion_r1540638957


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -57,20 +64,95 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory, Optional
    */
   public void scheduleReminder(DagActionStore.DagAction dagAction, long 
reminderDurationMillis)
       throws SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
-    Trigger trigger = 
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, 
reminderDurationMillis);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws 
SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
-    }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
+    JobDetail jobDetail = createReminderJobDetail(dagAction);
     quartzScheduler.deleteJob(jobDetail.getKey());
   }
 
+  /**
+   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
+   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
+   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
+   */
+  @Slf4j
+  public static class ReminderJob implements Job {
+    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+
+    @Override
+    public void execute(JobExecutionContext context) {
+      // Get properties from the trigger to create a dagAction
+      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(
+          jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+
+      log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", 
flowName: " + flowName
+          + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+
+      DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
+          dagActionType);
+
+      try {
+        DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class);
+        dagManagement.addDagAction(dagAction);
+      } catch (IOException e) {
+        log.error("Failed to add DagAction to DagManagement. Action: {}", 
dagAction);
+      }
+    }
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating all dagAction fields
+   */
+  public static String createDagActionReminderKey(DagActionStore.DagAction 
dagAction) {
+    return createDagActionReminderKey(dagAction.getFlowName(), 
dagAction.getFlowGroup(), dagAction.getFlowExecutionId(),
+        dagAction.getJobName(), dagAction.getDagActionType());
+  }
+
+  /**
+   * Creates a key for the reminder job by concatenating flowName, flowGroup, 
flowExecutionId, jobName, dagActionType
+   * in that order
+   */
+  public static String createDagActionReminderKey(String flowName, String 
flowGroup, String flowId, String jobName,
+      DagActionStore.DagActionType dagActionType) {
+    return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId, 
jobName, dagActionType);
+  }
+
+  /**
+   * Creates a jobDetail containing flow and job identifying information in 
the jobDataMap, uniquely identified
+   *  by a key comprised of the dagAction's fields.
+   */
+  public static JobDetail createReminderJobDetail(DagActionStore.DagAction 
dagAction) {
+    JobDataMap dataMap = new JobDataMap();
+    dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
+    dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
+    dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
+    dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagAction.getFlowExecutionId());
+    dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, 
dagAction.getDagActionType());
+
+    return JobBuilder.newJob(ReminderJob.class)
+        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowName())
+        .usingJobData(dataMap)
+        .build();
+  }
+
+  /**
+   * Creates a Trigger object with the same key as the ReminderJob (since only 
one trigger is expected to be associated
+   * with a job at any given time) that should fire after 
`reminderDurationMillis` millis.
+   */
+  public static Trigger createReminderJobTrigger(DagActionStore.DagAction 
dagAction, long reminderDurationMillis) {
+    Trigger trigger = TriggerBuilder.newTrigger()
+        .withIdentity(createDagActionReminderKey(dagAction), 
dagAction.getFlowName())
+        .startAt(new Date(System.currentTimeMillis() + reminderDurationMillis))

Review Comment:
   hard-coded "clocks" make testing challenging.  AFAICT, this method here 
should just take the desired target time in millis (rather than the offset).
   
   that leaves the caller, `scheduleReminder`, the need for obtaining the 
current time.  a `Supplier<Long> getCurrentTimeMillis` member would help there. 
 pair that with a ctor form taking such a param (when testing).  the ctor w/o 
that extra param (for not testing) would simply substitute 
`System::currentTimeMillis`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to