[ 
https://issues.apache.org/jira/browse/GOBBLIN-2151?focusedWorklogId=934133&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-934133
 ]

ASF GitHub Bot logged work on GOBBLIN-2151:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Sep/24 02:32
            Start Date: 11/Sep/24 02:32
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1753028698


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -156,48 +161,61 @@ public void 
testConcurrentFlowPreviousExecutionWithNonTerminalStatusRunningWithi
     String flowGroup = "fg";
     String flowName = "fn";
     long flowFinishDeadline = 10000L;
-    long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    long flowStartTime = System.currentTimeMillis() - 1 ;  // giving test 
flowFinishDeadline + 1 ms to finish
+    long currentFlowExecutionId = System.currentTimeMillis() ;
 
     insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.RUNNING,
         ConfigFactory.empty()
             
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
             .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
 
-    Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup, 
flowName, this.dagManagementStateStore));
+    
Assert.assertTrue(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
 flowName,
+        currentFlowExecutionId, this.dagManagementStateStore));
   }
 
   @Test
   public void testConcurrentFlowNoPreviousExecutionRunning() throws 
IOException, URISyntaxException {
     String flowGroup = "fg";
     String flowName = "fn";
-    long currentFlowExecutionId = 67890L;
-    long flowFinishDeadline = 10000L;
     long flowStartTime = System.currentTimeMillis();  // giving test 
flowFinishDeadline to finish
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+            .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowStartTime)));
+
+    // change the mock to not return any previous flow status
     when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(), 
anyString())).thenReturn(Collections.emptyList());
-    Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1", 
currentFlowExecutionId,
-        DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5, 
"user5", ConfigFactory.empty()
-            .withValue(ConfigurationKeys.FLOW_GROUP_KEY, 
ConfigValueFactory.fromAnyRef(flowGroup))
-            .withValue(ConfigurationKeys.FLOW_NAME_KEY, 
ConfigValueFactory.fromAnyRef(flowName))
-            
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
-            .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME, 
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
-            .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, 
ConfigValueFactory.fromAnyRef(
-                MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
-    dag.getNodes().forEach(node -> 
node.getValue().setFlowStartTime(flowStartTime));
-    this.dagManagementStateStore.addDag(dag);
 
-    
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup, 
flowName, this.dagManagementStateStore));
+    
Assert.assertFalse(FlowCompilationValidationHelper.isFlowBeforeThisExecutionRunning(flowGroup,
 flowName,
+        flowStartTime, this.dagManagementStateStore));
+  }
+
+  @Test
+  public void 
testConcurrentFlowCurrentExecutionWithNonTerminalStatusRunningWithinJobStartDeadline()
 throws IOException, URISyntaxException {
+    String flowGroup = "fg";
+    String flowName = "fn";
+    long jobStartDeadline = 10000L;
+    long flowStartTime = System.currentTimeMillis();
+
+    insertFlowIntoDMSSMock(flowGroup, flowName, flowStartTime, 
ExecutionStatus.PENDING,
+        ConfigFactory.empty()

Review Comment:
   in this case where the only flow status found is for the flowExecId 
currently being validated, that arises during a re-attempt of the same 
`DagActionType.LAUNCH`, correct?
   
   if so, would we expect the `ExecutionStatus.COMPILED` or actually `PENDING`? 
 if the former, let's ensure we have a test case for that.
   
   also, suggest method naming to make clear this is not actually "concurrent", 
but "same" execution.  maybe 
`testSameFlowExecAlreadyCompiledWithinJobStartDeadline()`





Issue Time Tracking
-------------------

    Worklog Id:     (was: 934133)
    Time Spent: 3h  (was: 2h 50m)

> ignore flows that are running beyond job start and flow finish deadline
> -----------------------------------------------------------------------
>
>                 Key: GOBBLIN-2151
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2151
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to