[ 
https://issues.apache.org/jira/browse/GOBBLIN-2111?focusedWorklogId=926402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-926402
 ]

ASF GitHub Bot logged work on GOBBLIN-2111:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Jul/24 20:15
            Start Date: 17/Jul/24 20:15
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #4002:
URL: https://github.com/apache/gobblin/pull/4002#discussion_r1681674052


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java:
##########
@@ -36,4 +43,25 @@ public LaunchDagTask(DagActionStore.DagAction dagAction, 
LeaseAttemptStatus.Leas
   public <T> T host(DagTaskVisitor<T> visitor) {
     return visitor.meet(this);
   }
+
+  @Override
+  public final boolean conclude() {
+    try {
+      // Delete adhoc flowSpecs from catalog if the dag was concluded properly
+      if (super.conclude()) {
+        DagManager.DagId dagId = 
DagManagerUtils.generateDagId(this.dagAction.getFlowGroup(),
+            this.dagAction.getFlowName(), this.dagAction.getFlowExecutionId());
+        FlowSpec flowSpec =
+            
this.dagManagementStateStore.getFlowSpec(FlowSpec.Utils.createFlowSpecUri(dagId.getFlowId()));
+        flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
dagId.getFlowExecutionId());
+        if (!flowSpec.isScheduled()) {
+          dagManagementStateStore.removeFlowSpec(flowSpec);

Review Comment:
   This API is cleaner IMO and we can have the other version as well in case we 
end up needing it at some point? What do you think?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 926402)
    Time Spent: 2h 20m  (was: 2h 10m)

> Delete adhoc flowSpecs from FlowCatalog for DagProcEngine
> ---------------------------------------------------------
>
>                 Key: GOBBLIN-2111
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2111
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> DagProcessingEngine fails to remove adhoc flow specs from catalog after 
> checkpointing the dag. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to