[
https://issues.apache.org/jira/browse/GOBBLIN-2107?focusedWorklogId=925172&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-925172
]
ASF GitHub Bot logged work on GOBBLIN-2107:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Jul/24 00:33
Start Date: 10/Jul/24 00:33
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3996:
URL: https://github.com/apache/gobblin/pull/3996#discussion_r1671397418
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -140,31 +140,39 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- return Optional.absent();
- }
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-
- if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
-
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
- return Optional.fromNullable(jobExecutionPlanDag);
- } else {
- log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
- + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
- SharedFlowMetricsSingleton.CompiledState.SKIPPED);
-
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
- if (!flowSpec.isScheduled()) {
- // For ad-hoc flow, we might already increase quota, we need to
decrease here
- for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
- quotaManager.releaseQuota(dagNode);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlan = null;
+
+ try {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
+ if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
+ optionalJobExecutionPlan = Optional.fromNullable(jobExecutionPlanDag);
+ } else {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ } finally {
+ if (optionalJobExecutionPlan == null ||
!optionalJobExecutionPlan.isPresent()) {
Review Comment:
updated this code path to make it more clear. I realized the quota manager
does not need to be updated when the compilation fails so this looks much
simpler now
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -140,31 +140,39 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
- return Optional.absent();
- }
- addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
-
- if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
-
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
- return Optional.fromNullable(jobExecutionPlanDag);
- } else {
- log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
- + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
-
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(flowSpec,
- SharedFlowMetricsSingleton.CompiledState.SKIPPED);
-
Instrumented.markMeter(sharedFlowMetricsSingleton.getSkippedFlowsMeter());
- if (!flowSpec.isScheduled()) {
- // For ad-hoc flow, we might already increase quota, we need to
decrease here
- for (Dag.DagNode dagNode : jobExecutionPlanDag.getStartNodes()) {
- quotaManager.releaseQuota(dagNode);
+ Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlan = null;
+
+ try {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
+
+ if (isExecutionPermitted(flowStatusGenerator, flowGroup, flowName,
allowConcurrentExecution,
+
Long.parseLong(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))))
{
+ optionalJobExecutionPlan = Optional.fromNullable(jobExecutionPlanDag);
+ } else {
+ log.warn("Another instance of flowGroup: {}, flowName: {} running;
Skipping flow execution since "
+ + "concurrent executions are disabled for this flow.", flowGroup,
flowName);
+ optionalJobExecutionPlan = Optional.absent();
+ }
+ } finally {
+ if (optionalJobExecutionPlan == null ||
!optionalJobExecutionPlan.isPresent()) {
Review Comment:
updated this code path to make it more clear. I realized the quota manager
does not need to be updated when the compilation fails so this looks much
simpler now
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -321,8 +321,10 @@ protected void
submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
launchSubmissionMetricProxy.markFailure();
return;
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
- launchSubmissionMetricProxy.markFailure();
+ log.info("Spec not found for flowId {} due to deletion by active
dagManager host due to exception {}",
+ flowId, e.getMessage());
+ // TODO: mark this failure if there are other valid cases of this
exception
+ // launchSubmissionMetricProxy.markFailure();
Review Comment:
It's speculative, I think there could be a case where a
SpecNotFoundException occurs in a case _not_ due to deletion by active DM if
something goes wrong at a different point in the system (mistaken deletion NOT
from the deleteFlowSpecIfAdhoc code path). It's hard to definitively say all of
these errors are due to an active DM deleting it but I would expect them all to
be.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -321,8 +321,10 @@ protected void
submitFlowToDagManagerHelper(DagActionStore.DagAction dagAction,
launchSubmissionMetricProxy.markFailure();
return;
} catch (SpecNotFoundException e) {
- log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
- launchSubmissionMetricProxy.markFailure();
+ log.info("Spec not found for flowId {} due to deletion by active
dagManager host due to exception {}",
+ flowId, e.getMessage());
+ // TODO: mark this failure if there are other valid cases of this
exception
+ // launchSubmissionMetricProxy.markFailure();
Review Comment:
It's speculative, I think there could be a case where a
SpecNotFoundException occurs in a case _not_ due to deletion by active DM if
something goes wrong at a different point in the system (mistaken deletion NOT
from the deleteFlowSpecIfAdhoc code path). It's hard to definitively say all of
these errors are due to an active DM deleting it but I would expect them all to
be.
Issue Time Tracking
-------------------
Worklog Id: (was: 925172)
Time Spent: 1h 10m (was: 1h)
> Delete adhoc flowSpecs from flowCatalog
> ----------------------------------------
>
> Key: GOBBLIN-2107
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2107
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Delete adhoc flowSpecs from flowCatalog to avoid build up of adhoc flowSpecs
> in catalog even when job compilation fails by adding the deletion in a
> finally block in non-MA scheduler case.
> # Previous PR: [https://github.com/apache/gobblin/pull/3944/files] causes a
> bug where adhoc flows are not deleted when multi-active scheduler is enabled.
> This PR re-adds deletion of flowSpec to the dagManager to be called by the
> active host for the MA scheduler case (there will only be one active host if
> dagManager is enabled otherwise code will use dagProcessingEngine).
> # It also ensures quota is released and failed flow compilation event sent
> by the FlowCompilationValidationHelper in all compilation failure cases. This
> was being incorrectly handled before.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)