phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1846985471
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -103,6 +103,13 @@ public interface DagManagementStateStore {
*/
void updateDagNode(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
+ /**
+ * Returns true if lease can be acquired on entity provided in leaseParams.
+ * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action was triggered,
+ * and if the dag action event we're checking on is a
reminder event
+ */
+ boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws
IOException;
Review Comment:
DMSS has no concept of leasing, since that's meant to be a lower-level impl
detail. accordingly let's avoid `LeaseParams` in this interface.
given we already have this method:
```
boolean existsFlowDagAction(String flowGroup, String flowName, long
flowExecutionId, DagActionStore.DagActionType dagActionType)
throws IOException, SQLException;
```
how about this new one:
```
boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String
flowName, long flowExecutionId) throws IOException, SQLException;
```
?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * An {@link RuntimeException} thrown when lease cannot be acquired on
provided entity.
+ */
+public class LeaseUnavailableException extends RuntimeException {
Review Comment:
this name is misleading in our current context, where nobody even tried to
acquire any lease (**that case** is anyway already represented by
`LeaseAttemptStatus.LeasedToAnotherStatus`).
how about `WouldNotBeLeasableException` or
`TooSoonToRerunSameFlowException`? I prefer the latter, which clearly
characterizes a restriction on using the API, whereas the former suggests impl.
details, w/o specifically naming the problem (e.g. why not leasable?).
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws
Exception {
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
+ /*
+ test to verify if leasable entity is unavailable before epsilon time
+ to account for clock drift
+ */
+ @Test
+ public void testWhenLeasableEntityUnavailable() throws Exception{
+ LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ completeLeaseHelper(launchLeaseParams3);
+ Thread.sleep(LESS_THAN_EPSILON);
+
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));
+ }
+
+ /*
+ test to verify if leasable entity exists post epsilon time
+ */
+ @Test
+ public void testWhenLeasableEntityAvailable() throws Exception{
+ LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams4, true);
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ completeLeaseHelper(launchLeaseParams4);
+ Thread.sleep(MORE_THAN_EPSILON);
+
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams4));
Review Comment:
same here
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -125,6 +128,7 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
_log.info("Orchestrator - onAdd[Topology]Spec: " + addedSpec);
this.specCompiler.onAddSpec(addedSpec);
} else if (addedSpec instanceof FlowSpec) {
+ validateAdhocFlowLeasability((FlowSpec) addedSpec);
Review Comment:
nit: "validate"/"verify" are good for methods returning a boolean. the
entire purpose of this `void` method is to throw an exception. clearly
indicate that with stronger naming, like "failIf..." or "enforce..."
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
+ /**
+ * This method checks if lease can be acquired on provided flow in lease
params
+ * returns true if entry for the same flow does not exists within epsilon
time
+ * in leaseArbiterStore, else returns false
+ * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action
+ * was triggered, and if the dag action event we're
checking on is a reminder event
+ * @return true if lease can be acquired on the flow passed in the lease
params, false otherwise
+ */
+ boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)
Review Comment:
and apologies that I probably wasn't explaining clearly when earlier
suggesting names like `existsLeasableEntity` (to mean that "**another one
already** exists, historically")
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
+ /**
+ * This method checks if lease can be acquired on provided flow in lease
params
+ * returns true if entry for the same flow does not exists within epsilon
time
+ * in leaseArbiterStore, else returns false
+ * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action
+ * was triggered, and if the dag action event we're
checking on is a reminder event
+ * @return true if lease can be acquired on the flow passed in the lease
params, false otherwise
+ */
+ boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)
Review Comment:
the method name itself suggests a pre-check capability (e.g. first check
whether it's acquirable and if so, then `tryAcquireLease`... being assured of
success).
of course, because check-then-act patterns are susceptible to race
conditions, we'd never actually provide such an API. let's not confuse anyone!
how about `boolean existsSimilarLeaseWithinConsolidationPeriod(LeaseParams)`?
(or `existsEquivalentLeaseWithinConsolidationPeriod`)
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -256,7 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId,
FlowStatusId>, FlowConfig> cr
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE,
e.getMessage());
- } catch (Throwable e) {
+ } catch(LeaseUnavailableException e){
+ throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
e.getMessage());
+ }
+ catch (Throwable e) {
Review Comment:
formatting
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1,
List<T> list2) {
return true;
}
+ @Test
+ public void testcanAcquireLeaseOnEntity() throws Exception{
Review Comment:
camel case typo... (but anyway, `canAcquireLeaseOnEntity` is not the name of
the method)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,17 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
+ /**
+ * This method checks if lease can be acquired on provided flow in lease
params
+ * returns true if entry for the same flow does not exists within epsilon
time
Review Comment:
very reasonable method-level javadoc... but it turns out `epsilon` is not
mentioned anywhere in class-level javadoc, so this method description lacks
context.
so, please add the class-level info. mentioning the name 'epsilon' is fine,
but definitely also give it a more specific name, like "Lease Consolidation
Period".
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -201,6 +212,33 @@ public void testAcquireLeaseSingleParticipant() throws
Exception {
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
+ /*
+ test to verify if leasable entity is unavailable before epsilon time
+ to account for clock drift
+ */
+ @Test
+ public void testWhenLeasableEntityUnavailable() throws Exception{
+ LeaseAttemptStatus firstLaunchStatus =
+ mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchLeaseParams3, true);
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ completeLeaseHelper(launchLeaseParams3);
+ Thread.sleep(LESS_THAN_EPSILON);
+
Assert.assertFalse(mysqlMultiActiveLeaseArbiter.isLeaseAcquirable(launchLeaseParams3));
Review Comment:
the whole idea is that a "similar" (but NOT same) lease isn't itself already
within epsilon. hence, be sure to test `LeaseParams` that were NOT given to
`tryAcquireLease`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) {
}
}
+ /*
+ Determines if a lease can be acquired for the given flow. A lease is
acquirable if
+ no existing lease record exists in arbiter table or the record is older
then epsilon time
+ */
+ @Override
+ public boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams)
throws IOException {
+ Optional<GetEventInfoResult> infoResult =
getExistingEventInfo(leaseParams);
+ return infoResult.isPresent() ? !infoResult.get().isWithinEpsilon() : true;
Review Comment:
idiomatic:
```
return infoResult.map(result -> !result.isWithinEpsilon()).getOrElse(true);
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
+ /*
+ validates if lease can be acquired on the provided flowSpec,
+ else throw LeaseUnavailableException
+ */
+ private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ if (!flowSpec.isScheduled()) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+ DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+ FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+ _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
+ try {
+ if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
+ throw new LeaseUnavailableException("Lease already occupied by
another execution of this flow");
+ }
+ } catch (IOException exception) {
+ _log.error(String.format("Failed to query leaseArbiterTable for
existing flow details: %s", flowSpec), exception);
Review Comment:
we called `dagManagementStateStore.isLeaseAcquirable(leaseParams)`... who
said anything about "leaseArbiterTable"? :)
(anyway, the table's name is dynamically set in config).
instead:
```
_log.error("unable to check whether lease acquirable " + leaseParams, ex);
```
(also on the line below)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,31 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
+ /*
+ validates if lease can be acquired on the provided flowSpec,
+ else throw LeaseUnavailableException
+ */
+ private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ if (!flowSpec.isScheduled()) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+
+ DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+ FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+ _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
Review Comment:
keep it brief! (we just made improvements in that vein
https://github.com/apache/gobblin/pull/4074 )
maybe:
```
_log.info("checking adhoc lease acquirability {}" + leaseParams);
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseUnavailableException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * An {@link RuntimeException} thrown when lease cannot be acquired on
provided entity.
+ */
+public class LeaseUnavailableException extends RuntimeException {
+ public LeaseUnavailableException(String message) {
Review Comment:
beyond clearly naming for callers, impl-wise, this definitely relates to a
flow, so that should be a ctor param. consider whether to allow a catcher to
reach in to access the details as instance member(s) or merely to use
internally in the ctor, to contextualize the `message` passed along to `super`.
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -116,9 +124,10 @@ public void setUp() throws Exception {
this.flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
Optional.of(logger), Optional.absent(), true);
this.serviceLauncher.addService(flowCatalog);
-
+ MultiActiveLeaseArbiter leaseArbiter =
Mockito.mock(MultiActiveLeaseArbiter.class);
MySqlDagManagementStateStore dagManagementStateStore =
spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase));
+ this.dagManagementStateStore=dagManagementStateStore;
Review Comment:
spaces around `=`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -92,6 +95,16 @@ public static <T> boolean compareLists(List<T> list1,
List<T> list2) {
return true;
}
+ @Test
+ public void testcanAcquireLeaseOnEntity() throws Exception{
+
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+ String flowName = "testFlow";
+ String flowGroup = "testGroup";
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(),
"testJob", DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction);
+ Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
Review Comment:
where's the test to exercise `false` path?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,60 @@ public void createFlowSpec() throws Throwable {
"SpecProducer should contain 0 Spec after addition");
}
+ /*
+ If another flow has already acquired lease for this flowspec details
within
+ epsilon time, then we do not execute this flow, hence do not process and
store the spec
+ and throw LeaseUnavailableException
+ */
+ @Test(expectedExceptions = LeaseUnavailableException.class)
+ public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+ dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+ }
+
+ /*
+ If no other flow has acquired lease within the epsilon time, then flow
+ compilation and addition to the store occurs normally
+ */
+ @Test
+ public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+ .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+ .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+ AddSpecResponse addSpecResponse =
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+ Assert.assertNotNull(addSpecResponse);
+ }
+
+ /*
+ For Scheduled flow lease acquirable check does not occur,
+ and flow compilation occurs successfully
+ */
+ @Test
+ public void onAddSpecForScheduledFlow() throws IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+ .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+ .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+ AddSpecResponse response = new AddSpecResponse<>(new Object());
+ Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
Review Comment:
don't you also need to add this mock:
```
Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
```
(w/ the expectation it would never be called)?
alternatively, verify that mock was never invoked
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -86,6 +92,8 @@ public class OrchestratorTest {
private FlowSpec flowSpec;
private ITestMetastoreDatabase testMetastoreDatabase;
private Orchestrator dagMgrNotFlowLaunchHandlerBasedOrchestrator;
Review Comment:
seems misnomer, as we no longer have any `Orchestrator` capable of using the
DagMgr (now completely removed) instead of `FlowLaunchHandler`. suggest to
rename merely to `orchestrator`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]