[
https://issues.apache.org/jira/browse/GOBBLIN-2062?focusedWorklogId=923053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-923053
]
ASF GitHub Bot logged work on GOBBLIN-2062:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 12/Jun/24 01:15
Start Date: 12/Jun/24 01:15
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1635654894
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -278,18 +278,10 @@ protected void startUp() {
}
/**
- * Method to submit a {@link Dag} to the {@link DagManager} and delete adhoc
flowSpecs from the FlowCatalog after
- * persisting it in the other addDag method called. The DagManager's failure
recovery method ensures the flow will be
- * executed in the event of downtime.
- * @param flowSpec
- * @param dag
- * @param persist
- * @param setStatus
- * @throws IOException
+ * Delete adhoc flowSpecs from the {@link FlowCatalog} after (separately)
persisting via {@link DagManager#addDag(Dag, boolean, boolean)}.
+ * This DagManager's failure recovery mechanisms ensure the flow will be
executed, even in the event of downtime.
*/
- public synchronized void addDagAndRemoveAdhocFlowSpec(FlowSpec flowSpec,
Dag<JobExecutionPlan> dag, boolean persist, boolean setStatus)
- throws IOException {
- addDag(dag, persist, setStatus);
+ public synchronized void removeFlowSpecIfAdhoc(FlowSpec flowSpec) throws
IOException {
// Only the active dagManager should delete the flowSpec
if (isActive) {
deleteSpecFromCatalogIfAdhoc(flowSpec);
Review Comment:
It shouldn't be in GSJS because all hosts in multi-active scheduler state
needed access to the flowSpec in catalog see this PR desc:
https://github.com/apache/gobblin/pull/3846. We were running into `no spec
found ` in the `DagActionStoreChangeMonitor` error if an inactive host deleted
the `flowSpec` before active host processed CDC stream message. For MA
execution we want the spec in catalog until all hosts have received the CDC
stream message and retrieved spec -> need to think about how to achieve this
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
- this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
- new FlowCompilationValidationHelper(config,
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
- this.topologyCatalog.addListener(orchestrator);
- this.flowCatalog.addListener(orchestrator);
+ FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
Review Comment:
I can make a note to add the other code path for `FlowLaunchHandler baed
orchestrate`. If those reside in this test class, then following the
methodology discussed about we should have a separate `setupClass` method used
for that test.
Issue Time Tracking
-------------------
Worklog Id: (was: 923053)
Time Spent: 5h 50m (was: 5h 40m)
> adhoc flow failure due to concurrent execs must be removed from flow catalog
> ----------------------------------------------------------------------------
>
> Key: GOBBLIN-2062
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2062
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> the Orchestrator + DagManager MUST remove adhoc flows that violate concurrent
> execs from the flow catalog. otherwise gaas will continue to return '409
> Conflict' to each subsequent attempt to create an adhoc flow with the same
> flowGroup+flowName. this is despite the fact that the flow (which still
> remains in the FlowCatalog, when it shouldn't be) already has the status
> FAILED, which is a "final status".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)