[
https://issues.apache.org/jira/browse/GOBBLIN-2136?focusedWorklogId=933267&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-933267
]
ASF GitHub Bot logged work on GOBBLIN-2136:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/Sep/24 23:02
Start Date: 04/Sep/24 23:02
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4031:
URL: https://github.com/apache/gobblin/pull/4031#discussion_r1744600676
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -215,61 +182,15 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup,
flowName);
- /* Only compile and pass directly to `DagManager` when multi-active
scheduler NOT enabled; otherwise
- recompilation to occur later, once `DagActionStoreChangeMonitor`
subsequently delegates this
- `DagActionType.LAUNCH`
- */
- if (flowLaunchHandler.isPresent()) {
- DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(
- flowGroup,
- flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
- DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams
- leaseObject = new DagActionStore.LeaseParams(launchDagAction,
isReminderEvent,
- triggerTimestampMillis);
- // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
- flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps,
leaseObject, flowSpec.isScheduled());
- _log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
- launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
- } else {
- try {
- TimingEvent flowCompilationTimer =
- new TimingEvent(this.eventSubmitter,
TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
- flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
-
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
- } finally {
- /* Remove adhoc flow spec from the flow catalog, regardless of
whether the flow was successfully validated
- and permitted to exec (concurrently)
- */
- this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
- }
- }
+ DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+ FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams
+ leaseObject = new DagActionStore.LeaseParams(launchDagAction,
isReminderEvent,
+ triggerTimestampMillis);
Review Comment:
nit: don't leave the type as the only thing on a line by itself: also put
the `leaseObject` variable name beside it too
Issue Time Tracking
-------------------
Worklog Id: (was: 933267)
Time Spent: 3h 50m (was: 3h 40m)
> remove obsolete code related to DagManager
> ------------------------------------------
>
> Key: GOBBLIN-2136
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2136
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)