arjun4084346 commented on code in PR #3950:
URL: https://github.com/apache/gobblin/pull/3950#discussion_r1606104577
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java:
##########
@@ -111,4 +122,52 @@ public static void
submitJobToExecutor(DagManagementStateStore dagManagementStat
throw new RuntimeException(e);
}
}
+
+ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan>
dagNodeToCancel, DagManagementStateStore dagManagementStateStore) throws
IOException {
+ Properties props = new Properties();
+ DagManager.DagId dagId = DagManagerUtils.generateDagId(dagNodeToCancel);
+ if
(dagNodeToCancel.getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
+
dagNodeToCancel.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+ }
+
+ try {
+ if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
+ Future future = dagNodeToCancel.getValue().getJobFuture().get();
+ String serializedFuture =
DagManagerUtils.getSpecProducer(dagNodeToCancel).serializeAddSpecResponse(future);
+ props.put(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE,
serializedFuture);
+ sendCancellationEvent(dagNodeToCancel.getValue());
+ } else {
+ log.warn("No Job future when canceling DAG node (hence, not sending
cancellation event) - {}",
+ dagNodeToCancel.getValue().getJobSpec().getUri());
Review Comment:
Sometimes dag node has not yet started running and does not have a spec
producer in it. Yet, user can submit its cancellation request. In those cases
it will come to this `else` block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]