[ 
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933267
 ]

ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Sep/24 23:02
            Start Date: 04/Sep/24 23:02
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744600676


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -215,61 +182,15 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
       sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
-      /* Only compile and pass directly to `DagManager` when multi-active 
scheduler NOT enabled; otherwise
-      recompilation to occur later, once `DagActionStoreChangeMonitor` 
subsequently delegates this
-      `DagActionType.LAUNCH`
-       */
-      if (flowLaunchHandler.isPresent()) {
-        DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(
-            flowGroup,
-            flowName,
-            FlowUtils.getOrCreateFlowExecutionId(flowSpec),
-            DagActionStore.DagActionType.LAUNCH);
-        DagActionStore.LeaseParams
-            leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
-            triggerTimestampMillis);
-        // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as 
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
-        flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
leaseObject, flowSpec.isScheduled());
-        _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
-            launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
-      } else {
-        try {
-          TimingEvent flowCompilationTimer =
-              new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_COMPILED);
-          Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-          Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-              
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
-                  flowName, flowMetadata);
-
-          if (!compiledDagOptional.isPresent()) {
-            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-            return;
-          }
-          Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
-          if (compiledDag.isEmpty()) {
-            
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
 flowSpec,
-                flowMetadata);
-            Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-            
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-                SharedFlowMetricsSingleton.CompiledState.FAILED);
-            _log.warn("Cannot determine an executor to run on for Spec: " + 
spec);
-            return;
-          }
-          
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
-              SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
-          
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata, 
compiledDag);
-          flowCompilationTimer.stop(flowMetadata);
-
-          // Depending on if DagManager is present, handle execution
-          submitFlowToDagManager(flowSpec, compiledDag);
-        } finally {
-          /* Remove adhoc flow spec from the flow catalog, regardless of 
whether the flow was successfully validated
-          and permitted to exec (concurrently)
-           */
-          this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
-        }
-      }
+      DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
+      DagActionStore.LeaseParams
+          leaseObject = new DagActionStore.LeaseParams(launchDagAction, 
isReminderEvent,
+          triggerTimestampMillis);

Review Comment:
   nit: don't leave the type as the only thing on a line by itself: also put 
the `leaseObject` variable name beside it too





Issue Time Tracking
-------------------

    Worklog Id:     (was: 933267)
    Time Spent: 3h 50m  (was: 3h 40m)

> remove obsolete code related to DagManager
> ------------------------------------------
>
>                 Key: GOBBLIN-2136
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to