[ 
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944250
 ]

ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Nov/24 07:46
            Start Date: 19/Nov/24 07:46
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847701708


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/TooSoonToRerunSameFlowException.java:
##########
@@ -20,8 +20,15 @@
 /**
  * An {@link RuntimeException} thrown when lease cannot be acquired on 
provided entity.
  */
-public class LeaseUnavailableException extends RuntimeException {
-  public LeaseUnavailableException(String message) {
+public class TooSoonToRerunSameFlowException extends RuntimeException {
+  private final FlowSpec flowSpec;
+
+  public TooSoonToRerunSameFlowException(String message, FlowSpec flowSpec) {
     super(message);
+    this.flowSpec = flowSpec;
+  }
+
+  public FlowSpec getFlowSpec() {
+    return flowSpec;
   }

Review Comment:
   instead use `@Getter` annotation (from `lombok`)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -29,6 +29,7 @@
 import java.util.Properties;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;

Review Comment:
   gobblin's own imports belong farther down, around L55



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -63,13 +63,13 @@ LeaseAttemptStatus 
tryAcquireLease(DagActionStore.LeaseParams leaseParams, boole
 
   /**
    * This method checks if lease can be acquired on provided flow in lease 
params
-   * returns true if entry for the same flow does not exists within epsilon 
time
+   * returns true if entry for the same flow does not exists within Lease 
Consolidation Period

Review Comment:
   sense is reversed here...
   
   maybe:
   > Check whether the same flowGroup+flowName is within the Lease 
Consolidation Period (aka. epsilon) from other, unrelated leasing activity
   
   this is also out-of-date:
   ```
   @return true if lease can be acquired on the flow passed in the lease 
params, false otherwise
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
 
   /**
    * Returns true if lease can be acquired on entity provided in leaseParams.
-   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action was triggered,
-   *                      and if the dag action event we're checking on is a 
reminder event
+   * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action

Review Comment:
   javadoc seems out-of-date, esp. mentioning LeaseParams and DagAction
   
   also, out-of-date:
   ```
   Returns true if lease can be acquired on entity provided in leaseParams.
   ```



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/modules/restli/FlowConfigsV2ResourceHandler.java:
##########
@@ -257,10 +257,10 @@ public CreateKVResponse<ComplexResourceKey<FlowId, 
FlowStatusId>, FlowConfig> cr
       responseMap = this.flowCatalog.put(flowSpec, true);
     } catch (QuotaExceededException e) {
       throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, 
e.getMessage());
-    } catch(LeaseUnavailableException e){
-      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
e.getMessage());
-    }
-    catch (Throwable e) {
+    } catch(TooSoonToRerunSameFlowException e) {
+      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_409_CONFLICT,
+          "FlowSpec with URI " + flowSpec.getUri() + " was launched within the 
lease consolidation period, no action will be taken"));

Review Comment:
   nit: "was **previously** launched within"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
   }
 
   /*
-    validates if lease can be acquired on the provided flowSpec,
-    else throw LeaseUnavailableException
+    enforces that a similar flow is not launching,
+    else throw TooSoonToRerunSameFlowException
    */
-  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+  private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {

Review Comment:
   this doesn't enforce the existence, but rather the **non-**existence.
   
   also, "similar lease" generally sounds apt, if we consider "similar" to be 
**the same** FlowId, but different executionId.  hence the lease is similar, 
while "the flow" is actually... **the same**.
   
   this line of reasoning leaves "similar flow" sounding imprecise at best and 
confusing at worst.  I regret suggesting it and apologize for that.  (it stands 
out more clearly when I'm solely reading vs. struggling to originate a name 
myself.)
   
   really, we're talking about recent execution of the same FlowId (aka. 
flowGroup+flowName).  maybe `enforceNoRecentAdhocExecOfSameFlow`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
 
   /**
    * Returns true if lease can be acquired on entity provided in leaseParams.
-   * @param leaseParams   uniquely identifies the flow, the present action 
upon it, the time the action was triggered,
-   *                      and if the dag action event we're checking on is a 
reminder event
+   * Check if an action exists in dagAction store by flow group, flow name, 
flow execution id, and job name.
+   * @param flowGroup flow group for the dag action
+   * @param flowName flow name for the dag action
+   * @param flowExecutionId flow execution for the dag action
+   * @throws IOException
    */
-  boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws 
IOException;
+  boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String 
flowName, long flowExecutionId) throws IOException;

Review Comment:
   how about `existsCurrentlyLaunchingExecOfSameFlow`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -392,7 +393,12 @@ private Map<String, AddSpecResponse> 
updateOrAddSpecHelper(Spec spec, boolean tr
       // If flow fails compilation, the result will have a non-empty string 
with the error
       if (!response.getValue().getFailures().isEmpty()) {
         for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry : response.getValue().getFailures().entrySet()) {
-          throw entry.getValue().getError().getCause();
+          Throwable error = entry.getValue().getError();
+          if (error instanceof TooSoonToRerunSameFlowException) {
+            throw (TooSoonToRerunSameFlowException) error;
+          } else {
+            throw error.getCause();
+          }

Review Comment:
   I'm not crazy about having to explicitly carve out a special case for this 
exception.  couldn't we instead, when throwing it in the first place, wrap it 
in an extra `RuntimeException` that we know will be stripped off here?
   
   (if doing that, be sure to add a comment explaining it's for the 
`SpecCatalogListener` `CallbackResult` handling over here.)



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -81,6 +82,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
       new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
   private static final DagActionStore.LeaseParams
       launchLeaseParams4 = new DagActionStore.LeaseParams(launchDagAction4, 
false, eventTimeMillis);
+  private static final DagActionStore.DagAction launchDagAction3_similar =
+      new DagActionStore.DagAction(flowGroup3, flowName, flowExecutionId1, 
jobName, DagActionStore.DagActionType.LAUNCH);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams3_similar = new 
DagActionStore.LeaseParams(launchDagAction3_similar, false, eventTimeMillis);
+  private static final DagActionStore.DagAction launchDagAction4_similar =
+      new DagActionStore.DagAction(flowGroup4, flowName, flowExecutionId1, 
jobName, DagActionStore.DagActionType.LAUNCH);
+  private static final DagActionStore.LeaseParams
+      launchLeaseParams4_similar = new 
DagActionStore.LeaseParams(launchDagAction4_similar, false, eventTimeMillis);

Review Comment:
   nit: so we can easily surmise what's different between `launchDA3` and 
`launchDA3_similar`, please put the 3s and the 4s next to each other.
   
   also, the two-step init is very clunky with an extra name plus one more line 
of boiler-plate.  in cases where the `DagAction` is merely used to init 
`LeaseParams`, skip creating a separate name for the `DagAction`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -59,6 +59,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
   private static final long flowExecutionId = 12345677L;
+  private static final long flowExecutionId1 = 12345996L;

Review Comment:
   I never noticed before that `flowExecutionId`, which is customarily 
millis-since-epoch only has 7 digits when it should have 10.  let's fix that 
and also define this as:
   ```
   private static final long flowExecutionIdAlt = flowExecutionId + ...; // 
whatever you consider a reasonable (later) offset
   ```



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStoreTest.java:
##########
@@ -96,13 +95,19 @@ public static <T> boolean compareLists(List<T> list1, 
List<T> list2) {
   }
 
   @Test
-  public void testcanAcquireLeaseOnEntity() throws Exception{
-    
Mockito.when(leaseArbiter.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
+  public void testExistsCurrentlyLaunchingSimilarFlowGivesTrue() throws 
Exception{
+    
Mockito.when(leaseArbiter.existsSimilarLeaseWithinConsolidationPeriod(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true);
     String flowName = "testFlow";
     String flowGroup = "testGroup";
-    DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction(flowName, flowGroup, System.currentTimeMillis(), 
"testJob", DagActionStore.DagActionType.LAUNCH);
-    DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction);
-    Assert.assertTrue(dagManagementStateStore.isLeaseAcquirable(leaseParams));
+    
Assert.assertTrue(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup,
 flowName, any(Long.class)));

Review Comment:
   apologies, but I'm not actually familiar w/ what `any(Long.class)` means in 
a context like this.  I'm familiar w/ such arg matchers setting-up mocking and 
also to verify prior invocations of a mock - but not within an actual 
invocation of a non-mock.
   
   guessing: does it choose a random long and pass that as the arg?  it might 
be better to just pass an actual value, such as `System.currentTimeMillis()`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -370,16 +376,18 @@ public void onAddSpecForScheduledFlow() throws 
IOException {
     FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
     AddSpecResponse response = new AddSpecResponse<>(new Object());
     Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response);
-    AddSpecResponse addSpecResponse = 
dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+    AddSpecResponse addSpecResponse = orchestrator.onAddSpec(flowSpec);
     Assert.assertNotNull(addSpecResponse);
+    // Verifying that for scheduled flow isLeaseAcquirable is not called
+    Mockito.verify(dagManagementStateStore, 
Mockito.times(0)).existsCurrentlyLaunchingSimilarFlow(anyString(), anyString(), 
anyLong());

Review Comment:
   comment names wrong method
   
   also, use `Mockito.never()`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
   }
 
   /*
-    validates if lease can be acquired on the provided flowSpec,
-    else throw LeaseUnavailableException
+    enforces that a similar flow is not launching,
+    else throw TooSoonToRerunSameFlowException
    */
-  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+  private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
     if (!flowSpec.isScheduled()) {
       Config flowConfig = flowSpec.getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
-      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
-          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
-      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
-      _log.info("validation of lease acquirability of adhoc flow with lease 
params: " + leaseParams);
+      _log.info("checking existing adhoc flow existence for " + flowGroup + 
"." + flowName);

Review Comment:
   "existing adhoc flow existence".... did you mean "existing adhoc flow 
execution"?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
   }
 
   /*
-    validates if lease can be acquired on the provided flowSpec,
-    else throw LeaseUnavailableException
+    enforces that a similar flow is not launching,
+    else throw TooSoonToRerunSameFlowException
    */
-  private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+  private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
     if (!flowSpec.isScheduled()) {
       Config flowConfig = flowSpec.getConfig();
       String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
       String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
 
-      DagActionStore.DagAction dagAction = 
DagActionStore.DagAction.forFlow(flowGroup, flowName,
-          FlowUtils.getOrCreateFlowExecutionId(flowSpec), 
DagActionStore.DagActionType.LAUNCH);
-      DagActionStore.LeaseParams leaseParams = new 
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
-      _log.info("validation of lease acquirability of adhoc flow with lease 
params: " + leaseParams);
+      _log.info("checking existing adhoc flow existence for " + flowGroup + 
"." + flowName);
       try {
-        if (!dagManagementStateStore.isLeaseAcquirable(leaseParams)) {
-          throw new LeaseUnavailableException("Lease already occupied by 
another execution of this flow");
+        if 
(dagManagementStateStore.existsCurrentlyLaunchingSimilarFlow(flowGroup, 
flowName, FlowUtils.getOrCreateFlowExecutionId(flowSpec))) {
+          throw new TooSoonToRerunSameFlowException("Lease already occupied by 
another execution of this flow", flowSpec);

Review Comment:
   1. we have an `.info` line above announcing the check, so let's follow w/ a 
`.warn` line here when the check fails.  suggest: "another recent adhoc flow 
exec found for...."
   
   2. exception msg could benefit from minor improvements, yet--however it's 
phrased--it belongs encapsulated in the `TooSoonToRerun...` ctor, which should 
take solely a `FlowSpec` param





Issue Time Tracking
-------------------

    Worklog Id:     (was: 944250)
    Time Spent: 2h 50m  (was: 2h 40m)

> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
>                 Key: GOBBLIN-2173
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Abhishek Jain
>            Assignee: Abhishek Tiwari
>            Priority: Critical
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to 
> persist them in service restart/failover scenarios. However, it is expected 
> that once these flows are kicked off/ forwarded to the DagProcEngine, they 
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge 
> case(s) where they are persisted in the db. This can be fatal for users such 
> as DIL that run adhoc flows using the same flowgroup/flowname consistently, 
> which will lead to their flows being stuck. We need to find which edge cases 
> are not handling the flow spec deletion properly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to