[
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=943962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-943962
]
ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Nov/24 17:13
Start Date: 15/Nov/24 17:13
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1844070218
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
+ /**
+ * This method checks if lease can be acquired on provided flow in lease
params
+ * returns true if entry for the same flow does not exists within epsilon
time
+ * in leaseArbiterStore
+ * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action
+ * was triggered, and if the dag action event we're
checking on is a reminder event
+ */
Review Comment:
Please add `@return` in javadocs
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java:
##########
@@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter {
LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams,
boolean adoptConsensusFlowExecutionId)
throws IOException;
+ /**
+ * This method checks if lease can be acquired on provided flow in lease
params
+ * returns true if entry for the same flow does not exists within epsilon
time
+ * in leaseArbiterStore
+ * @param leaseParams uniquely identifies the flow, the present action
upon it, the time the action
+ * was triggered, and if the dag action event we're
checking on is a reminder event
+ */
+ boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams)
Review Comment:
consider renaming this to `isLeaseAcquirable` for conciseness and to be
consistent with other method names
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
+ private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
+ if (!flowSpec.isScheduled()) {
+ Config flowConfig = flowSpec.getConfig();
+ String flowGroup =
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ DagActionStore.DagAction dagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName,
+ FlowUtils.getOrCreateFlowExecutionId(flowSpec),
DagActionStore.DagActionType.LAUNCH);
+ DagActionStore.LeaseParams leaseParams = new
DagActionStore.LeaseParams(dagAction, System.currentTimeMillis());
+ try {
+ if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) {
+ throw new LeaseUnavailableException("Lease already occupied by
another execution of this flow");
Review Comment:
add an info log here with `flowGroup`, `flowName`.. it would be useful in
debugging
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) {
}
}
+ @Override
+ public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams
leaseParams) throws IOException {
Review Comment:
Please add javadoc.. something like:
`Determines if a lease can be acquired for the given flow. A lease is
acquirable if ...`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
"SpecProducer should contain 0 Spec after addition");
}
+ /*
+ If another flow has already acquired lease for this flowspec details
within
+ epsilon time, then we do not execute this flow, hence do not process and
store the spec
+ and throw LeaseUnavailableException
+ */
+ @Test(expectedExceptions = LeaseUnavailableException.class)
+ public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException
{
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+ dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+ }
+
+ @Test
+ public void testOnAddSpec_withFlowSpec_Available() throws IOException {
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *")
+ .addPrimitive("gobblin.flow.sourceIdentifier", "source")
+ .addPrimitive("gobblin.flow.destinationIdentifier", "destination");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
Review Comment:
can we use `this.flowSpec` here since it already has the schedule?
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
"SpecProducer should contain 0 Spec after addition");
}
+ /*
+ If another flow has already acquired lease for this flowspec details
within
+ epsilon time, then we do not execute this flow, hence do not process and
store the spec
+ and throw LeaseUnavailableException
+ */
+ @Test(expectedExceptions = LeaseUnavailableException.class)
+ public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException
{
Review Comment:
let's also test the scenario when `canAcquireLeaseOnEntity` returns `true`
for adhoc flow since the other test is for scheduled flow
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
+ private void validateAdhocFlowLeasability(FlowSpec flowSpec) {
Review Comment:
add javadoc
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable {
"SpecProducer should contain 0 Spec after addition");
}
+ /*
+ If another flow has already acquired lease for this flowspec details
within
+ epsilon time, then we do not execute this flow, hence do not process and
store the spec
+ and throw LeaseUnavailableException
+ */
+ @Test(expectedExceptions = LeaseUnavailableException.class)
+ public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException
{
+ ConfigBuilder configBuilder = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName");
+ Config config = configBuilder.build();
+ FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build();
+
Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false);
+ dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec);
+ }
+
+ @Test
+ public void testOnAddSpec_withFlowSpec_Available() throws IOException {
Review Comment:
use camelcase for naming methods
Issue Time Tracking
-------------------
Worklog Id: (was: 943962)
Remaining Estimate: 0h
Time Spent: 10m
> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
> Key: GOBBLIN-2173
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Abhishek Jain
> Assignee: Abhishek Tiwari
> Priority: Critical
> Time Spent: 10m
> Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to
> persist them in service restart/failover scenarios. However, it is expected
> that once these flows are kicked off/ forwarded to the DagProcEngine, they
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge
> case(s) where they are persisted in the db. This can be fatal for users such
> as DIL that run adhoc flows using the same flowgroup/flowname consistently,
> which will lead to their flows being stuck. We need to find which edge cases
> are not handling the flow spec deletion properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)