arjun4084346 commented on code in PR #3899:
URL: https://github.com/apache/gobblin/pull/3899#discussion_r1533998777
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -57,20 +69,104 @@ public DagActionReminderScheduler(StdSchedulerFactory
schedulerFactory, Optional
*/
public void scheduleReminder(DagActionStore.DagAction dagAction, long
reminderDurationMillis)
throws SchedulerException {
- if (!dagManagement.isPresent()) {
- throw new RuntimeException("DagManagement not initialized in
multi-active execution mode when required.");
+ if (this.dagManagement == null) {
+ initialize();
}
- JobDetail jobDetail =
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(),
dagAction);
- Trigger trigger =
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction,
reminderDurationMillis);
+ JobDetail jobDetail = createReminderJobDetail(dagManagement, dagAction);
+ Trigger trigger = createReminderJobTrigger(dagAction,
reminderDurationMillis);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws
SchedulerException {
- if (!dagManagement.isPresent()) {
- throw new RuntimeException("DagManagement not initialized in
multi-active execution mode when required.");
+ if (this.dagManagement == null) {
+ initialize();
}
- JobDetail jobDetail =
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(),
dagAction);
+ JobDetail jobDetail = createReminderJobDetail(dagManagement, dagAction);
quartzScheduler.deleteJob(jobDetail.getKey());
}
+ /**
+ * Static class used to store information regarding a pending dagAction that
needs to be revisited at a later time
+ * by {@link DagManagement} interface to re-attempt a lease on if it has not
been completed by the previous owner.
+ * These jobs are scheduled and used by the {@link
DagActionReminderScheduler}.
+ */
+ @Slf4j
+ public static class ReminderJob implements Job {
+ public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+ public static final String DAG_MANAGEMENT_KEY = "dag.management";
+
+ @Override
+ public void execute(JobExecutionContext context) {
+ // Get properties from the trigger to create a dagAction
+ JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+ String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+ String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ DagActionStore.DagActionType dagActionType =
DagActionStore.DagActionType.valueOf(
+ jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+ DagManagement dagManagement =
GobblinServiceManager.getClass(DagManagement.class); //(DagManagement)
jobDataMap.get(DAG_MANAGEMENT_KEY);
+
+ log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
+ + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
+ dagActionType);
+
+ try {
+ dagManagement.addDagAction(dagAction);
+ } catch (IOException e) {
+ log.error("Failed to add DagAction to DagManagement. Action: {}",
dagAction);
+ }
+ }
+ }
+
+ /**
+ * Creates a key for the reminder job by concatenating all dagAction fields
+ */
+ public static String createDagActionReminderKey(DagActionStore.DagAction
dagAction) {
+ return createDagActionReminderKey(dagAction.getFlowName(),
dagAction.getFlowGroup(), dagAction.getFlowExecutionId(),
+ dagAction.getJobName(), dagAction.getDagActionType());
+ }
+
+ /**
+ * Creates a key for the reminder job by concatenating flowName, flowGroup,
flowExecutionId, jobName, dagActionType
+ * in that order
+ */
+ public static String createDagActionReminderKey(String flowName, String
flowGroup, String flowId, String jobName,
+ DagActionStore.DagActionType dagActionType) {
+ return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId,
jobName, dagActionType);
+ }
+
+ /**
+ * Creates a jobDetail containing flow and job identifying information in
the jobDataMap, uniquely identified
+ * by a key comprised of the dagAction's fields. It also serializes a
reference to the {@link DagManagement} object
+ * to be referenced when the trigger fires.
+ */
+ public static JobDetail createReminderJobDetail(DagManagement dagManagement,
DagActionStore.DagAction dagAction) {
+ JobDataMap dataMap = new JobDataMap();
+ dataMap.put(ReminderJob.DAG_MANAGEMENT_KEY, dagManagement);
Review Comment:
remove this now? we dont need DagManagement in the method signature also,
right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]