[
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944250
]
ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Nov/24 07:46
Start Date: 19/Nov/24 07:46
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847701708
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java:
##########
@@ -20,8 +20,15 @@
/**
* An {@link RuntimeException} thrown when lease cannot be acquired on
provided entity.
*/
-public class LeaseUnavailableException extends RuntimeException {
- public LeaseUnavailableException(String message) {
+public class TooSoonToRerunSameFlowException extends RuntimeException {
+ private final FlowSpec flowSpec;
+
+ public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) {
super(message);
+ this.flowSpec = flowSpec;
+ }
+
+ public FlowSpec getFlowSpec() {
+ return flowSpec;
}
Review Comment:
instead use `@Getter` annotation (from `lombok`)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -29,6 +29,7 @@
import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
Review Comment:
gobblin's own imports belong farther down, around L55
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -63,13 +63,13 @@ LeaseAttemptStatus
tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole
/**
* This method checks if lease can be acquired on provided flow in lease
params
- * returns true if entry for the same flow does not exists within epsilon
time
+ * returns true if entry for the same flow does not exists within Lease
Consolidation Period
Review Comment:
sense is reversed here...
maybe:
> Check whether the same flowGroup+flowName is within the Lease
Consolidation Period (aka. epsilon) from other, unrelated leasing activity
this is also out-of-date:
```
@return true if lease can be acquired on the flow passed in the lease
params, false otherwise
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
/**
* Returns true if lease can be acquired on entity provided in leaseParams.
- * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action was triggered,
- * and if the dag action event we're checking on is a
reminder event
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
Review Comment:
javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction
also, out-of-date:
```
Returns true if lease can be acquired on entity provided in leaseParams.
```
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -257,10 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId,
FlowStatusId>, FlowConfig> cr
responseMap = this.flowCatalog.put(flowSpec, true);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE,
e.getMessage());
- } catch(LeaseUnavailableException e){
- throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
e.getMessage());
- }
- catch (Throwable e) {
+ } catch(TooSoonToRerunSameFlowException e) {
+ return new CreateKVResponse<>(new
RestLiServiceException(HttpStatus.S_409_CONFLICT,
+ "FlowSpec with URI " + flowSpec.getUri() + " was launched within the
lease consolidation period, no action will be taken"));
Review Comment:
nit: "was **previously** launched within"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
Review Comment:
this doesn't enforce the existence, but rather the **non-**existence.
also, "similar lease" generally sounds apt, if we consider "similar" to be
**the same** FlowId, but different executionId. hence the lease is similar,
while "the flow" is actually... **the same**.
this line of reasoning leaves "similar flow" sounding imprecise at best and
confusing at worst. I regret suggesting it and apologize for that. (it stands
out more clearly when I'm solely reading vs. struggling to originate a name
myself.)
really, we're talking about recent execution of the same FlowId (aka.
flowGroup+flowName). maybe `enforceNoRecentAdhocExecOfSameFlow`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
/**
* Returns true if lease can be acquired on entity provided in leaseParams.
- * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action was triggered,
- * and if the dag action event we're checking on is a
reminder event
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @throws IOException
*/
- boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws
IOException;
+ boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String
flowName, long flowExecutionId) throws IOException;
Review Comment:
how about `existsCurrentlyLaunchingExecOfSameFlow`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -392,7 +393,12 @@ private Map<String, AddSpecResponse>
updateOrAddSpecHelper(Spec spec, boolean tr
// If flow fails compilation, the result will have a non-empty string
with the error
if (!response.getValue().getFailures().isEmpty()) {
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getFailures().entrySet()) {
- throw entry.getValue().getError().getCause();
+ Throwable error = entry.getValue().getError();
+ if (error instanceof TooSoonToRerunSameFlowException) {
+ throw (TooSoonToRerunSameFlowException) error;
+ } else {
+ throw error.getCause();
+ }
Review Comment:
I'm not crazy about having to explicitly carve out a special case for this
exception. couldn't we instead, when throwing it in the first place, wrap it
in an extra `RuntimeException` that we know will be stripped off here?
(if doing that, be sure to add a comment explaining it's for the
`SpecCatalogListener` `CallbackResult` handling over here.)
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
private static final DagActionStore.LeaseParams
launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4,
false, eventTimeMillis);
+ private static final DagActionStore.DagAction launchDagAction3_similar =
+ new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1,
jobName, DagActionStore.DagActionType.LAUNCH);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams3_similar = new
DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis);
+ private static final DagActionStore.DagAction launchDagAction4_similar =
+ new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1,
jobName, DagActionStore.DagActionType.LAUNCH);
+ private static final DagActionStore.LeaseParams
+ launchLeaseParams4_similar = new
DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis);
Review Comment:
nit: so we can easily surmise what's different between `launchDA3` and
`launchDA3_similar`, please put the 3s and the 4s next to each other.
also, the two-step init is very clunky with an extra name plus one more line
of boiler-plate. in cases where the `DagAction` is merely used to init
`LeaseParams`, skip creating a separate name for the `DagAction`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final String flowName = "testFlowName";
private static final String jobName = "testJobName";
private static final long flowExecutionId = 12345677L;
+ private static final long flowExecutionId1 = 12345996L;
Review Comment:
I never noticed before that `flowExecutionId`, which is customarily
millis-since-epoch only has 7 digits when it should have 10. let's fix that
and also define this as:
```
private static final long flowExecutionIdAlt = flowExecutionId + ...; //
whatever you consider a reasonable (later) offset
```
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -96,13 +95,19 @@ public static <T> boolean compareLists(List<T> list1,
List<T> list2) {
}
@Test
- public void testcanAcquireLeaseOnEntity() throws Exception{
-
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+ public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws
Exception{
+
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
String flowName = "testFlow";
String flowGroup = "testGroup";
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(),
"testJob", DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction);
- Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
+
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup,
flowName, any(Long.class)));
Review Comment:
apologies, but I'm not actually familiar w/ what `any(Long.class)` means in
a context like this. I'm familiar w/ such arg matchers setting-up mocking and
also to verify prior invocations of a mock - but not within an actual
invocation of a non-mock.
guessing: does it choose a random long and pass that as the arg? it might
be better to just pass an actual value, such as `System.currentTimeMillis()`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -370,16 +376,18 @@ public void onAddSpecForScheduledFlow() throws
IOException {
FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
AddSpecResponse response = new AddSpecResponse<>(new Object());
Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
- AddSpecResponse addSpecResponse =
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+ AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
Assert.assertNotNull(addSpecResponse);
+ // Verifying that for scheduled flow isLeaseAcquirable is not called
+ Mockito.verify(dagManagementStateStore,
Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(),
anyLong());
Review Comment:
comment names wrong method
also, use `Mockito.never()`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
- _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
+ _log.info("checking existing adhoc flow existence for " + flowGroup +
"." + flowName);
Review Comment:
"existing adhoc flow existence".... did you mean "existing adhoc flow
execution"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
- _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
+ _log.info("checking existing adhoc flow existence for " + flowGroup +
"." + flowName);
try {
- if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
- throw new LeaseUnavailableException("Lease already occupied by
another execution of this flow");
+ if
(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup,
flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
+ throw new TooSoonToRerunSameFlowException("Lease already occupied by
another execution of this flow", flowSpec);
Review Comment:
1. we have an `.info` line above announcing the check, so let's follow w/ a
`.warn` line here when the check fails. suggest: "another recent adhoc flow
exec found for...."
2. exception msg could benefit from minor improvements, yet--however it's
phrased--it belongs encapsulated in the `TooSoonToRerun...` ctor, which should
take solely a `FlowSpec` param
Issue Time Tracking
-------------------
Worklog Id: (was: 944250)
Time Spent: 2h 50m (was: 2h 40m)
> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
> Key: GOBBLIN-2173
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Abhishek Jain
> Assignee: Abhishek Tiwari
> Priority: Critical
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to
> persist them in service restart/failover scenarios. However, it is expected
> that once these flows are kicked off/ forwarded to the DagProcEngine, they
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge
> case(s) where they are persisted in the db. This can be fatal for users such
> as DIL that run adhoc flows using the same flowgroup/flowname consistently,
> which will lead to their flows being stuck. We need to find which edge cases
> are not handling the flow spec deletion properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)