[
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944297&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944297
]
ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Nov/24 09:55
Start Date: 19/Nov/24 09:55
Worklog Time Spent: 10m
Work Description: vsinghal85 commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848021734
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -138,26 +138,23 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
/*
- validates if lease can be acquired on the provided flowSpec,
- else throw LeaseUnavailableException
+ enforces that a similar flow is not launching,
+ else throw TooSoonToRerunSameFlowException
*/
- private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ private void enforceSimilarAdhocFlowExistence(FlowSpec flowSpec) {
if (!flowSpec.isScheduled()) {
Config flowConfig = flowSpec.getConfig();
String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
- DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
- FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
- DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
- _log.info("validation of lease acquirability of adhoc flow with lease
params: " + leaseParams);
+ _log.info("checking existing adhoc flow existence for " + flowGroup +
"." + flowName);
Review Comment:
Apologies for the typo, updated to "existing adhoc flow entry"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -105,10 +105,13 @@ public interface DagManagementStateStore {
/**
* Returns true if lease can be acquired on entity provided in leaseParams.
- * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action was triggered,
- * and if the dag action event we're checking on is a
reminder event
+ * Check if an action exists in dagAction store by flow group, flow name,
flow execution id, and job name.
+ * @param flowGroup flow group for the dag action
+ * @param flowName flow name for the dag action
+ * @param flowExecutionId flow execution for the dag action
+ * @throws IOException
*/
- boolean isLeaseAcquirable(DagActionStore.LeaseParams leaseParams) throws
IOException;
+ boolean existsCurrentlyLaunchingSimilarFlow(String flowGroup, String
flowName, long flowExecutionId) throws IOException;
Review Comment:
Sure updated
Issue Time Tracking
-------------------
Worklog Id: (was: 944297)
Time Spent: 3h 50m (was: 3h 40m)
> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
> Key: GOBBLIN-2173
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Abhishek Jain
> Assignee: Abhishek Tiwari
> Priority: Critical
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to
> persist them in service restart/failover scenarios. However, it is expected
> that once these flows are kicked off/ forwarded to the DagProcEngine, they
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge
> case(s) where they are persisted in the db. This can be fatal for users such
> as DIL that run adhoc flows using the same flowgroup/flowname consistently,
> which will lead to their flows being stuck. We need to find which edge cases
> are not handling the flow spec deletion properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)