umustafi commented on code in PR #3994:
URL: https://github.com/apache/gobblin/pull/3994#discussion_r1669433410
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -77,28 +95,45 @@ public DagActionReminderScheduler(StdSchedulerFactory
schedulerFactory)
* @throws SchedulerException
*/
public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long
reminderDurationMillis,
- boolean isDeadlineReminder)
- throws SchedulerException {
+ boolean isDeadlineReminder) throws SchedulerException {
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams,
isDeadlineReminder);
- Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(),
reminderDurationMillis,
+ Trigger trigger = createReminderJobTrigger(dagAction,
reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
- log.info("Reminder set for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
- leaseParams.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
- quartzScheduler.scheduleJob(jobDetail, trigger);
+ log.info("Going to set reminder for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
Review Comment:
let's prefix all of these logs with `DagActionReminderScheduler` so easy to
find
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -77,28 +95,45 @@ public DagActionReminderScheduler(StdSchedulerFactory
schedulerFactory)
* @throws SchedulerException
*/
public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long
reminderDurationMillis,
- boolean isDeadlineReminder)
- throws SchedulerException {
+ boolean isDeadlineReminder) throws SchedulerException {
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams,
isDeadlineReminder);
- Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(),
reminderDurationMillis,
+ Trigger trigger = createReminderJobTrigger(dagAction,
reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
- log.info("Reminder set for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
- leaseParams.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
- quartzScheduler.scheduleJob(jobDetail, trigger);
+ log.info("Going to set reminder for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
+ dagAction, reminderDurationMillis, isDeadlineReminder);
+
+ try {
+ if (!this.deleteDeadlineDagActionCache.get(dagAction)) {
+ quartzScheduler.scheduleJob(jobDetail, trigger);
+ } else {
+ log.info("Ignoring {} because the delete equivalent of the same
received already.", dagAction);
+ this.deleteDeadlineDagActionCache.put(dagAction, false);
+ }
+ } catch (ObjectAlreadyExistsException e) {
+ log.warn("Reminder job {} already exists in the quartz scheduler.
Possibly a duplicate request.", jobDetail.getKey());
Review Comment:
`Reminder job for this dagAction already exists in the
DagActionReminderScheduler. Look for duplicate requests for the same action.
Job: {}`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -131,6 +129,7 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
log.info("Orchestrated job: {} on Executor: {}",
DagManagerUtils.getFullyQualifiedJobName(dagNode), specExecutorUri);
dagManagementStateStore.getDagManagerMetrics().incrementJobsSentToExecutor(dagNode);
dagManagementStateStore.addDagNodeState(dagNode, dagId);
+ sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
Review Comment:
comment to explain why moved below to document
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java:
##########
@@ -77,28 +95,45 @@ public DagActionReminderScheduler(StdSchedulerFactory
schedulerFactory)
* @throws SchedulerException
*/
public void scheduleReminder(DagActionStore.LeaseParams leaseParams, long
reminderDurationMillis,
- boolean isDeadlineReminder)
- throws SchedulerException {
+ boolean isDeadlineReminder) throws SchedulerException {
+ DagActionStore.DagAction dagAction = leaseParams.getDagAction();
JobDetail jobDetail = createReminderJobDetail(leaseParams,
isDeadlineReminder);
- Trigger trigger = createReminderJobTrigger(leaseParams.getDagAction(),
reminderDurationMillis,
+ Trigger trigger = createReminderJobTrigger(dagAction,
reminderDurationMillis,
System::currentTimeMillis, isDeadlineReminder);
- log.info("Reminder set for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
- leaseParams.getDagAction(), reminderDurationMillis,
isDeadlineReminder);
- quartzScheduler.scheduleJob(jobDetail, trigger);
+ log.info("Going to set reminder for dagAction {} to fire after {} ms,
isDeadlineTrigger: {}",
+ dagAction, reminderDurationMillis, isDeadlineReminder);
+
+ try {
+ if (!this.deleteDeadlineDagActionCache.get(dagAction)) {
+ quartzScheduler.scheduleJob(jobDetail, trigger);
+ } else {
+ log.info("Ignoring {} because the delete equivalent of the same
received already.", dagAction);
+ this.deleteDeadlineDagActionCache.put(dagAction, false);
+ }
+ } catch (ObjectAlreadyExistsException e) {
+ log.warn("Reminder job {} already exists in the quartz scheduler.
Possibly a duplicate request.", jobDetail.getKey());
+ } catch (JobPersistenceException e) {
+ // this may happen when there is a race condition between this and
delete job operation, retry this in that case
+ quartzScheduler.scheduleJob(jobDetail, trigger);
Review Comment:
what is this exception? does the occur if the scheduler attempts to delete
it at the same time? clarify comment
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java:
##########
@@ -122,6 +123,9 @@ public void addJobDagAction(String flowGroup, String
flowName, long flowExecutio
try {
fillPreparedStatement(flowGroup, flowName, flowExecutionId, jobName,
dagActionType, insertStatement);
return insertStatement.executeUpdate();
+ } catch (SQLIntegrityConstraintViolationException e) {
+ log.error(e.getMessage());
Review Comment:
why do we skip throwing an exception here?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -107,23 +107,10 @@ public DagManagementTaskStreamImpl(Config config,
Optional<DagActionStore> dagAc
this.dagManagementStateStore = dagManagementStateStore;
}
- @Override
- public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
- // TODO: Used to track missing dag issue, remove later as needed
- log.info("Add original (non-reminder) dagAction {}", dagAction);
-
- if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction,
false, System.currentTimeMillis()))) {
- throw new RuntimeException(String.format("Could not add dag action to
the queue %s", dagAction));
- }
- }
-
- @Override
- public synchronized void addReminderDagAction(DagActionStore.LeaseParams
reminderLeaseParams) {
- // TODO: Used to track missing dag issue, remove later as needed
- log.info("Add reminder dagAction {}", reminderLeaseParams);
-
+ public synchronized void addDagAction(DagActionStore.LeaseParams
reminderLeaseParams) {
Review Comment:
simply name this lease params since used for both type of actions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]