phet commented on code in PR #4050:
URL: https://github.com/apache/gobblin/pull/4050#discussion_r1751154565
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java:
##########
@@ -187,9 +195,65 @@ public Optional<Dag<JobExecutionPlan>>
validateAndHandleConcurrentExecution(Conf
* @param allowConcurrentExecution
* @return true if the {@link FlowSpec} allows concurrent executions or if
no other instance of the flow is currently RUNNING.
*/
- private boolean isExecutionPermitted(FlowStatusGenerator
flowStatusGenerator, String flowGroup, String flowName,
- boolean allowConcurrentExecution, long flowExecutionId) {
- return allowConcurrentExecution ||
!flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId);
+ private boolean isExecutionPermitted(String flowGroup, String flowName,
boolean allowConcurrentExecution)
+ throws IOException {
+ return allowConcurrentExecution || !isFlowRunning(flowGroup, flowName,
dagManagementStateStore);
+ }
+
+ /**
+ * Returns true if any previous execution for the flow determined by the
provided flowGroup, flowName is running.
+ * We ignore the execution that has the provided flowExecutionId. We also
ignore the flows that are running beyond
+ * the job start deadline and flow finish deadline.
+ */
+ @VisibleForTesting
+ static boolean isFlowRunning(String flowGroup, String flowName,
DagManagementStateStore dagManagementStateStore)
+ throws IOException {
+ List<FlowStatus> flowStatusList =
dagManagementStateStore.getAllFlowStatusesForFlow(flowGroup, flowName);
+
+ if (flowStatusList == null || flowStatusList.isEmpty()) {
+ return false;
+ }
+
+ for (FlowStatus flowStatus : flowStatusList) {
+ ExecutionStatus flowExecutionStatus =
flowStatus.getFlowExecutionStatus();
+ log.debug("Verifying if {} is running...", flowStatus);
+
+ if
(FlowStatusGenerator.FINISHED_STATUSES.contains(flowExecutionStatus.name())) {
+ // ignore finished entries
+ } else if (flowExecutionStatus == COMPILED || flowExecutionStatus ==
PENDING
+ || flowExecutionStatus == PENDING_RESUME || flowExecutionStatus ==
RUNNING) {
+ // these are the only four non-terminal statuses that a flow can have.
jobs have two more non-terminal statuses
+ // ORCHESTRATED and PENDING_RETRY
+ Dag.DagId dagIdOfOldExecution = new Dag.DagId(flowGroup, flowName,
flowStatus.getFlowExecutionId());
+ java.util.Optional<Dag<JobExecutionPlan>> dag =
dagManagementStateStore.getDag(dagIdOfOldExecution);
+
+ if (!dag.isPresent()) {
+ log.error("Dag is finished and cleaned up, job status monitor
somehow did not receive/update the flow status. Ignoring it here...");
+ continue;
+ }
+
+ Dag.DagNode<JobExecutionPlan> dagNode = dag.get().getNodes().get(0);
+ long flowStartTime = DagUtils.getFlowStartTime(dagNode);
+ long jobStartDeadline =
+ DagUtils.getJobStartDeadline(dagNode,
DagProcessingEngine.getDefaultJobStartDeadlineTimeMillis());
+ long flowFinishDeadline = DagUtils.getFlowFinishDeadline(dagNode);
+ if ((flowExecutionStatus == COMPILED || flowExecutionStatus ==
PENDING)
+ && System.currentTimeMillis() < flowStartTime +
jobStartDeadline
+ || (flowExecutionStatus == RUNNING || flowExecutionStatus ==
PENDING_RESUME)
+ && System.currentTimeMillis() < flowStartTime +
flowFinishDeadline) {
+ log.info("{} is still running. Found a dag for this, flowStartTime
{}, jobStartDeadline {}, flowFinishDeadline {}",
+ flowStatus, flowStartTime, jobStartDeadline,
flowFinishDeadline);
+ return true;
+ } else {
+ log.warn("Dag {} is still running beyond deadline! flowStartTime
{}, jobStartDeadline {}, flowFinishDeadline {}",
+ dag, flowStartTime, jobStartDeadline, flowFinishDeadline);
+ }
+ } else {
+ log.error("Unknown status {}", flowExecutionStatus);
Review Comment:
I agree! but we don't want the presence of one to cause either a fatal
error here or to prevent launching new flows
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]