arjun4084346 commented on code in PR #3899:
URL: https://github.com/apache/gobblin/pull/3899#discussion_r1533915866


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -57,20 +69,104 @@ public DagActionReminderScheduler(StdSchedulerFactory 
schedulerFactory, Optional
    */
   public void scheduleReminder(DagActionStore.DagAction dagAction, long 
reminderDurationMillis)
       throws SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
+    if (this.dagManagement == null) {
+      initialize();
     }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
-    Trigger trigger = 
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, 
reminderDurationMillis);
+    JobDetail jobDetail = createReminderJobDetail(dagManagement, dagAction);
+    Trigger trigger = createReminderJobTrigger(dagAction, 
reminderDurationMillis);
     quartzScheduler.scheduleJob(jobDetail, trigger);
   }
 
   public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws 
SchedulerException {
-    if (!dagManagement.isPresent()) {
-      throw new RuntimeException("DagManagement not initialized in 
multi-active execution mode when required.");
+    if (this.dagManagement == null) {
+      initialize();
     }
-    JobDetail jobDetail = 
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), 
dagAction);
+    JobDetail jobDetail = createReminderJobDetail(dagManagement, dagAction);
     quartzScheduler.deleteJob(jobDetail.getKey());
   }
 
+  /**
+   * Static class used to store information regarding a pending dagAction that 
needs to be revisited at a later time
+   * by {@link DagManagement} interface to re-attempt a lease on if it has not 
been completed by the previous owner.
+   * These jobs are scheduled and used by the {@link 
DagActionReminderScheduler}.
+   */
+  @Slf4j
+  public static class ReminderJob implements Job {
+    public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+    public static final String DAG_MANAGEMENT_KEY = "dag.management";
+
+    @Override
+    public void execute(JobExecutionContext context) {
+      // Get properties from the trigger to create a dagAction
+      JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+      String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String flowGroup = 
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+      String flowId = 
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+      DagActionStore.DagActionType dagActionType = 
DagActionStore.DagActionType.valueOf(
+          jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+      DagManagement dagManagement = 
GobblinServiceManager.getClass(DagManagement.class); //(DagManagement) 
jobDataMap.get(DAG_MANAGEMENT_KEY);

Review Comment:
   remove comment? also .getClass is tested and creating objects correctly, 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to