phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847701708
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java:
##########
@@ -20,8 +20,15 @@
/**
* An {@link RuntimeException} thrown when lease cannot be acquired on
provided entity.
*/
-public class LeaseUnavailableException extends RuntimeException {
- public LeaseUnavailableException(String message) {
+public class TooSoonToRerunSameFlowException extends RuntimeException {
+ private final FlowSpec flowSpec;
+
+ public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) {
super(message);
+ this.flowSpec = flowSpec;
+ }
+
+ public FlowSpec getFlowSpec() {
+ return flowSpec;
}
Review Comment:
instead use `@Getter` annotation (from `lombok`)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -29,6 +29,7 @@
import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
Review Comment:
gobblin's own imports belong farther down, around L55
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -63,13 +63,13 @@ LeaseAttemptStatus
tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole
/**
* This method checks if lease can be acquired on provided flow in lease
params
- * returns true if entry for the same flow does not exists within epsilon
time
+ * returns true if entry for the same flow does not exists within Lease
Consolidation Period
Review Comment:
sense is reversed here...
maybe:
> Check whether the same flowGroup+flowName is within the Lease
Consolidation Period (aka. epsilon) from other, unrelated leasing activity
this is also out-of-date:
```
@return true if lease can be acquired on the flow passed in the lease
params, false otherwise
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
/**
* Returns true if lease can be acquired on entity provided in leaseParams.
- * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action was triggered,
- * and if the dag action event we're checking on is a
reminder event
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
Review Comment:
javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction
also, out-of-date:
```
Returns true if lease can be acquired on entity provided in leaseParams.
```
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -257,10 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId,
FlowStatusId>, FlowConfig> cr
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE,
e.getMessage());
- } catch(LeaseUnavailableException e){
- throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
e.getMessage());
- }
- catch (Throwable e) {
+ } catch(TooSoonToRerunSameFlowException e) {
+ return new CreateKVResponse<>(new
RestLiServiceException(HttpStatus.S_409_CONFLICT,
+ "FlowSpec with URI " + flowSpec.getUri() + " was launched within the
lease consolidation period, no action will be taken"));
Review Comment:
nit: "was **previously** launched within"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
Review Comment:
this doesn't enforce the existence, but rather the **non-**existence.
also, "similar lease" generally sounds apt, if we consider "similar" to be
**the same** FlowId, but different executionId. hence the lease is similar,
while "the flow" is actually... **the same**.
this line of reasoning leaves "similar flow" sounding imprecise at best and
confusing at worst. I regret suggesting it and apologize for that. (it stands
out more clearly when I'm solely reading vs. struggling to originate a name
myself.)
really, we're talking about recent execution of the same FlowId (aka.
flowGroup+flowName). maybe `enforceNoRecentAdhocExecOfSameFlow`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
/**
* Returns true if lease can be acquired on entity provided in leaseParams.
- * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action was triggered,
- * and if the dag action event we're checking on is a
reminder event
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @throws IOException
*/
- boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws
IOException;
+ boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String
flowName, long flowExecutionId) throws IOException;
Review Comment:
how about `existsCurrentlyLaunchingExecOfSameFlow`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -392,7 +393,12 @@ private Map<String, AddSpecResponse>
updateOrAddSpecHelper(Spec spec, boolean tr
// If flow fails compilation, the result will have a non-empty string
with the error
if (!response.getValue().getFailures().isEmpty()) {
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getFailures().entrySet()) {
- throw entry.getValue().getError().getCause();
+ Throwable error = entry.getValue().getError();
+ if (error instanceof TooSoonToRerunSameFlowException) {
+ throw (TooSoonToRerunSameFlowException) error;
+ } else {
+ throw error.getCause();
+ }
Review Comment:
I'm not crazy about having to explicitly carve out a special case for this
exception. couldn't we instead, when throwing it in the first place, wrap it
in an extra `RuntimeException` that we know will be stripped off here?
(if doing that, be sure to add a comment explaining it's for the
`SpecCatalogListener` `CallbackResult` handling over here.)
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4,
false, eventTimeMillis);
+ private static final DagActionStore.DagAction launchDagAction3_similar =
+ new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1,
jobName, DagActionStore.DagActionType.LAUNCH);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams3_similar = new
DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis);
+ private static final DagActionStore.DagAction launchDagAction4_similar =
+ new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1,
jobName, DagActionStore.DagActionType.LAUNCH);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams4_similar = new
DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis);
Review Comment:
nit: so we can easily surmise what's different between `launchDA3` and
`launchDA3_similar`, please put the 3s and the 4s next to each other.
also, the two-step init is very clunky with an extra name plus one more line
of boiler-plate. in cases where the `DagAction` is merely used to init
`LeaseParams`, skip creating a separate name for the `DagAction`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final long flowExecutionId = 12345677L;
+ private static final long flowExecutionId1 = 12345996L;
Review Comment:
I never noticed before that `flowExecutionId`, which is customarily
millis-since-epoch only has 7 digits when it should have 10. let's fix that
and also define this as:
```
private static final long flowExecutionIdAlt = flowExecutionId + ...; //
whatever you consider a reasonable (later) offset
```
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -96,13 +95,19 @@ public static <T> boolean compareLists(List<T> list1,
List<T> list2) {
}
@Test
- public void testcanAcquireLeaseOnEntity() throws Exception{
-
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+ public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws
Exception{
+
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
String flowName = "testFlow";
String flowGroup = "testGroup";
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(),
"testJob", DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction);
- Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
+
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup,
flowName, any(Long.class)));
Review Comment:
apologies, but I'm not actually familiar w/ what `any(Long.class)` means in
a context like this. I'm familiar w/ such arg matchers setting-up mocking and
also to verify prior invocations of a mock - but not within an actual
invocation of a non-mock.
guessing: does it choose a random long and pass that as the arg? it might
be better to just pass an actual value, such as `System.currentTimeMillis()`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -370,16 +376,18 @@ public void onAddSpecForScheduledFlow() throws
IOException {
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
AddSpecResponse response = new AddSpecResponse<>(new Object());
Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
- AddSpecResponse addSpecResponse =
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+ AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
Assert.assertNotNull(addSpecResponse);
+ // Verifying that for scheduled flow isLeaseAcquirable is not called
+ Mockito.verify(dagManagementStateStore,
Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(),
anyLong());
Review Comment:
comment names wrong method
also, use `Mockito.never()`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
- _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
+ _log.info("checking existing adhoc flow existence for " + flowGroup +
"." + flowName);
Review Comment:
"existing adhoc flow existence".... did you mean "existing adhoc flow
execution"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
- _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
+ _log.info("checking existing adhoc flow existence for " + flowGroup +
"." + flowName);
try {
- if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
- throw new LeaseUnavailableException("Lease already occupied by
another execution of this flow");
+ if
(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup,
flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
+ throw new TooSoonToRerunSameFlowException("Lease already occupied by
another execution of this flow", flowSpec);
Review Comment:
1. we have an `.info` line above announcing the check, so let's follow w/ a
`.warn` line here when the check fails. suggest: "another recent adhoc flow
exec found for...."
2. exception msg could benefit from minor improvements, yet--however it's
phrased--it belongs encapsulated in the `TooSoonToRerun...` ctor, which should
take solely a `FlowSpec` param
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]