phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1751095651
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -187,9 +195,65 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowGroup, String flowName,
- boolean allowConcurrentExecution, long flowExecutionId) {
- return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+ private boolean isExecutionPermitted(String flowGroup, String flowName,
boolean allowConcurrentExecution)
+ throws IOException {
+ return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName,
dagManagementStateStore);
+ }
+
+ /**
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName is running.
+ * We ignore the execution that has the provided flowExecutionId. We also
ignore the flows that are running beyond
+ * the job start deadline and flow finish deadline.
+ */
+ @VisibleForTesting
+ static boolean isFlowRunning(String flowGroup, String flowName,
DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ List<FlowStatus> flowStatusList =
dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);
+
+ if (flowStatusList == null || flowStatusList.isEmpty()) {
+ return false;
+ }
+
+ for (FlowStatus flowStatus : flowStatusList) {
+ ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
+ log.debug("Verifying if {} is running...", flowStatus);
+
+ if
(FlowStatusGenerator.FINISHED_STATUSES.contains(flowExecutionStatus.name())) {
+ // ignore finished entries
+ } else if (flowExecutionStatus == COMPILED || flowExecutionStatus ==
PENDING
+ || flowExecutionStatus == PENDING_RESUME || flowExecutionStatus ==
RUNNING) {
+ // these are the only four non-terminal statuses that a flow can have.
jobs have two more non-terminal statuses
+ // ORCHESTRATED and PENDING_RETRY
+ Dag.DagId dagIdOfOldExecution = new Dag.DagId(flowGroup, flowName,
flowStatus.getFlowExecutionId());
+ java.util.Optional<Dag<JobExecutionPlan>> dag =
dagManagementStateStore.getDag(dagIdOfOldExecution);
+
+ if (!dag.isPresent()) {
+ log.error("Dag is finished and cleaned up, job status monitor
somehow did not receive/update the flow status. Ignoring it here...");
+ continue;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ long flowStartTime = DagUtils.getFlowStartTime(dagNode);
+ long jobStartDeadline =
+ DagUtils.getJobStartDeadline(dagNode,
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
+ long flowFinishDeadline = DagUtils.getFlowFinishDeadline(dagNode);
+ if ((flowExecutionStatus == COMPILED || flowExecutionStatus ==
PENDING)
+ && System.currentTimeMillis() < flowStartTime +
jobStartDeadline
+ || (flowExecutionStatus == RUNNING || flowExecutionStatus ==
PENDING_RESUME)
+ && System.currentTimeMillis() < flowStartTime +
flowFinishDeadline) {
+ log.info("{} is still running. Found a dag for this, flowStartTime
{}, jobStartDeadline {}, flowFinishDeadline {}",
+ flowStatus, flowStartTime, jobStartDeadline,
flowFinishDeadline);
+ return true;
+ } else {
+ log.warn("Dag {} is still running beyond deadline! flowStartTime
{}, jobStartDeadline {}, flowFinishDeadline {}",
+ dag, flowStartTime, jobStartDeadline, flowFinishDeadline);
+ }
+ } else {
+ log.error("Unknown status {}", flowExecutionStatus);
Review Comment:
`ORCHESTRATED` isn't a final status is it?
also, how to handle `$UNKNOWN`? shall we say it's NOT running?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelperTest.java:
##########
@@ -66,4 +100,127 @@ public void testSkipAddingFlowExecutionIdWhenPresent() {
Assert.assertEquals(flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD),
existingFlowExecutionId);
}
+
+ @Test
+ public void testConcurrentFlowPreviousFlowWithNonTerminalStatusWithNoDag()
throws IOException {
+ List<FlowStatus> list = new ArrayList<>();
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long previousFlowExecutionId = 12345L;
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(previousFlowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ list.add(new FlowStatus(flowName, flowGroup, previousFlowExecutionId,
jobStatusIterator, ExecutionStatus.COMPILED));
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(list);
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowPreviousFlowWithNonTerminalStatusRunningBeyondJobStartDeadline()
+ throws IOException, URISyntaxException {
+ List<FlowStatus> list = new ArrayList<>();
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long previousFlowExecutionId = 12345L;
+ long jobStartDeadline = 10L;
+ long flowStartTime = System.currentTimeMillis() - jobStartDeadline - 1;
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(previousFlowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.PENDING.name()).build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ list.add(new FlowStatus(flowName, flowGroup, previousFlowExecutionId,
jobStatusIterator, ExecutionStatus.PENDING));
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(list);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
previousFlowExecutionId,
+ DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_JOB_START_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(jobStartDeadline))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
+ this.dagManagementStateStore.addDag(dag);
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowPreviousFlowWithNonTerminalStatusRunningBeyondFlowFinishDeadline()
+ throws IOException, URISyntaxException {
+ List<FlowStatus> list = new ArrayList<>();
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long previousFlowExecutionId = 12345L;
+ long flowFinishDeadline = 30L;
+ long flowStartTime = System.currentTimeMillis() - flowFinishDeadline - 1;
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(previousFlowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.PENDING_RESUME.name())
+ .build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ list.add(new FlowStatus(flowName, flowGroup, previousFlowExecutionId,
jobStatusIterator, ExecutionStatus.PENDING_RESUME));
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(list);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
previousFlowExecutionId,
+ DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
+ this.dagManagementStateStore.addDag(dag);
+
+
Assert.assertFalse(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
+ }
+
+ @Test
+ public void
testConcurrentFlowPreviousFlowWithNonTerminalStatusRunningWithinFlowFinishDeadline()
+ throws IOException, URISyntaxException {
+ List<FlowStatus> list = new ArrayList<>();
+ String flowGroup = "fg";
+ String flowName = "fn";
+ long previousFlowExecutionId = 12345L;
+ long flowFinishDeadline = 10000L;
+ long flowStartTime = System.currentTimeMillis(); // giving test
flowFinishDeadline to finish
+ JobStatus jobStatus =
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(previousFlowExecutionId)
+
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.RUNNING.name())
+ .build();
+ Iterator<JobStatus> jobStatusIterator =
Lists.newArrayList(jobStatus).iterator();
+ list.add(new FlowStatus(flowName, flowGroup, previousFlowExecutionId,
jobStatusIterator, ExecutionStatus.RUNNING));
+ when(this.dagManagementStateStore.getAllFlowStatusesForFlow(anyString(),
anyString())).thenReturn(list);
+ Dag<JobExecutionPlan> dag = DagManagerTest.buildDag("1",
previousFlowExecutionId,
+ DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE.name(), 5,
"user5", ConfigFactory.empty()
+ .withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef(flowGroup))
+ .withValue(ConfigurationKeys.FLOW_NAME_KEY,
ConfigValueFactory.fromAnyRef(flowName))
+
.withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.name()))
+ .withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline))
+ .withValue(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY,
ConfigValueFactory.fromAnyRef(
+ MySqlDagManagementStateStoreTest.TEST_SPEC_EXECUTOR_URI)));
+ dag.getNodes().forEach(node ->
node.getValue().setFlowStartTime(flowStartTime));
+ this.dagManagementStateStore.addDag(dag);
Review Comment:
this DMSS mock config is so extensive that it really overshadows the intent
of these tests. how about abstracting into:
```
public static void insertFlowIntoDMSSMock(String flowGroup, String flowName,
long flowExecId, ExecutionStatus currFlowAndJobStatus, Config additionalConfigs)
```
NOTES:
* choose random username
* set flowStartTime == flowExecId
* use additionalConfigs as a fallback after adding: FLOW_GROUP_KEY,
FLOW_NAME_KEY, and SPECEXECUTOR_INSTANCE_URI_KEY
* always set GOBBLIN_FLOW_FINISH_DEADLINE_TIME_UNIT and
GOBBLIN_JOB_START_DEADLINE_TIME_UNIT to TimeUnit.MILLISECONDS... even if it may
not actually be necessary
with that, the setup and the assertion here are far simpler and easier to
grasp:
```
insertFlowIntoDMSSMock(flowGroup, flowName, previousFlowExecutionId,
ExecutionStatus.RUNNING,
ConfigFactory.empty().
withValue(ConfigurationKeys.GOBBLIN_FLOW_FINISH_DEADLINE_TIME,
ConfigValueFactory.fromAnyRef(flowFinishDeadline)));
Assert.assertTrue(FlowCompilationValidationHelper.isFlowRunning(flowGroup,
flowName, this.dagManagementStateStore));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]