[
https://issues.apache.org/jira/browse/GOBBLIN-2062?focusedWorklogId=917928&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-917928
]
ASF GitHub Bot logged work on GOBBLIN-2062:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 06/May/24 20:28
Start Date: 06/May/24 20:28
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3944:
URL: https://github.com/apache/gobblin/pull/3944#discussion_r1591493291
##########
gobblin-restli/server.gradle:
##########
@@ -45,6 +45,7 @@ dependencies {
}
compile externalDependency.gson
+ compile externalDependency.lombok
Review Comment:
unclear how this was missed... but not having meant MANY compilation warnings
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -81,12 +84,15 @@ public class OrchestratorTest {
private FlowCatalog flowCatalog;
private FlowSpec flowSpec;
- private Orchestrator orchestrator;
+
+ private FlowStatusGenerator mockFlowStatusGenerator;
+ private DagManager mockDagManager;
+ private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
private static final String TEST_TABLE = "quotas";
- @BeforeClass
+ @BeforeMethod
Review Comment:
unfortunately lots of "testing smells" here.
first off, class-level test init doesn't play well w/ verifying mock
interactions, since we want a fresh count for the exec of each test method.
relatedly it also complicates - if not outright foreclosing on - parallel test
method execution.
secondly, the `@Test(dependsOnMethod = ...)` structuring is a testing
anti-pattern that here obscured that `setup`-style init had been formulated
instead as a test of it's own - `createTopologySpec()`. a major clue of
something wrong is that particular "@Test" method does not even exercise
`Orchestrator`, our class-under-test!
for now, I've fixed this suite to use per-method setup/teardown best
practices, but left it as a TODO to figure out where `createTopologySpec`
actually belongs. `createFlowSpec` and `deleteFlowSpec` also deserve attention
in this same regard.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java:
##########
@@ -223,6 +223,8 @@ public static GobblinServiceManager
createTestGobblinServiceManager(Properties s
DagManager spiedDagManager = spy(gobblinServiceManager.getDagManager());
doNothing().when(spiedDagManager).setActive(anyBoolean());
+ // WARNING: this `spiedDagManager` WILL NOT BE the one used by the
`Orchestrator`: its DM has apparently already been
+ // provided to the `Orchestrator` ctor, prior to this replacement here of
`GobblinServiceManager.dagManager`
Review Comment:
documenting for posterity, since unfortunately, this was quite troublesome
to debug
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java:
##########
@@ -57,6 +58,8 @@ public static class NoLongerLeasingStatus extends
LeaseAttemptStatus {}
current LeaseObtainedStatus via the completeLease method from a caller
without access to the {@link MultiActiveLeaseArbiter}.
*/
@Data
+ // avoid - warning: Generating equals/hashCode implementation but without a
call to superclass, even though this class does not extend java.lang.Object
Review Comment:
another preventable source of compilation warnings
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -229,33 +229,39 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
- TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- Optional<Dag<JobExecutionPlan>> compiledDagOptional =
-
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
- flowName, flowMetadata);
-
- if (!compiledDagOptional.isPresent()) {
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
- return;
- }
- Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
- if (compiledDag.isEmpty()) {
-
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec, flowMetadata);
- Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ try {
+ TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ Optional<Dag<JobExecutionPlan>> compiledDagOptional =
+
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
+ flowName, flowMetadata);
+
+ if (!compiledDagOptional.isPresent()) {
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+ return;
+ }
+ Dag<JobExecutionPlan> compiledDag = compiledDagOptional.get();
+ if (compiledDag.isEmpty()) {
+
FlowCompilationValidationHelper.populateFlowCompilationFailedEventMessage(eventSubmitter,
flowSpec,
+ flowMetadata);
+ Instrumented.markMeter(this.flowOrchestrationFailedMeter);
+
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
+ SharedFlowMetricsSingleton.CompiledState.FAILED);
+ _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
+ return;
+ }
sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.FAILED);
- _log.warn("Cannot determine an executor to run on for Spec: " +
spec);
- return;
- }
- sharedFlowMetricsSingleton.conditionallyUpdateFlowGaugeSpecState(spec,
- SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
+ SharedFlowMetricsSingleton.CompiledState.SUCCESSFUL);
-
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
- flowCompilationTimer.stop(flowMetadata);
+
FlowCompilationValidationHelper.addFlowExecutionIdIfAbsent(flowMetadata,
compiledDag);
+ flowCompilationTimer.stop(flowMetadata);
- // Depending on if DagManager is present, handle execution
- submitFlowToDagManager(flowSpec, compiledDag);
+ // Depending on if DagManager is present, handle execution
+ submitFlowToDagManager(flowSpec, compiledDag);
+ } finally {
+ // remove from the flow catalog, regardless of whether the flow was
successfully validated and permitted to exec (concurrently)
+ this.dagManager.removeFlowSpecIfAdhoc(flowSpec);
+ }
Review Comment:
too bad the diff above doesn't clearly indicate it was solely an indentation
change to add the `try ... finally` here. the purpose of which is to ensure
FlowCatalog cleanup, come what may
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -127,19 +133,31 @@ public void setup() throws Exception {
SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
- this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
- new FlowCompilationValidationHelper(config,
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
- this.topologyCatalog.addListener(orchestrator);
- this.flowCatalog.addListener(orchestrator);
+ FlowCompilationValidationHelper flowCompilationValidationHelper = new
FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton,
mock(UserQuotaManager.class), mockFlowStatusGenerator);
+ this.dagMgrNotFlowLaunchHandlerBasedOrchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
Review Comment:
renamed to avoid confusion, as I now pass `Optional.absent()` for the
`FlowLaunchHandler`. nothing about the previously-existing tests below
strictly validated that code path, so I made this entire `OrchestratorTest`
class specific to the legacy `DagManager` version.
as for the lack of validation for the other code path, that suggests a
missing test: e.g. that `orchestrate` invokes
`FlowLaunchHandler::handleFlowLaunchTriggerEvent`
cc: @umustafi and @arjun4084346
Issue Time Tracking
-------------------
Worklog Id: (was: 917928)
Time Spent: 20m (was: 10m)
> adhoc flow failure due to concurrent execs must be removed from flow catalog
> ----------------------------------------------------------------------------
>
> Key: GOBBLIN-2062
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2062
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> the Orchestrator + DagManager MUST remove adhoc flows that violate concurrent
> execs from the flow catalog. otherwise gaas will continue to return '409
> Conflict' to each subsequent attempt to create an adhoc flow with the same
> flowGroup+flowName. this is despite the fact that the flow (which still
> remains in the FlowCatalog, when it shouldn't be) already has the status
> FAILED, which is a "final status".
--
This message was sent by Atlassian Jira
(v8.20.10#820010)