[
https://issues.apache.org/jira/browse/GOBBLIN-2173?focusedWorklogId=944391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-944391
]
ASF GitHub Bot logged work on GOBBLIN-2173:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 19/Nov/24 15:48
Start Date: 19/Nov/24 15:48
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848535757
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -394,8 +394,8 @@ private Map<String, AddSpecResponse>
updateOrAddSpecHelper(Spec spec, boolean tr
if (!response.getValue().getFailures().isEmpty()) {
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getFailures().entrySet()) {
Throwable error = entry.getValue().getError();
- if (error instanceof TooSoonToRerunSameFlowException) {
- throw (TooSoonToRerunSameFlowException) error;
+ if (error instanceof RuntimeException && error.getCause() instanceof
TooSoonToRerunSameFlowException) {
+ throw (TooSoonToRerunSameFlowException) error.getCause();
Review Comment:
the cast isn't necessary (the reason I suggested wrapping `TooSoonToR...`
was to enable uniform, type-agnostic code here.)
for a method with the signature `throws Throwable`, aren't these two
equivalent?
```
throw error.getCause();
throw (T) error.getCause();
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) {
}
}
+ /*
+ Determines if a lease can be acquired for the given flow. A lease is
acquirable if
+ no existing lease record exists in arbiter table or the record is older
then epsilon time
+ */
Review Comment:
probably no need for this comment here in the impl, but if you want one,
bring it into line w/ the orig from the interface
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -24,6 +24,7 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
Review Comment:
this import belongs a few lines down w/ other apache gobblin pkgs
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
+ /*
+ enforces that a similar adhoc flow is not launching,
+ else throw TooSoonToRerunSameFlowException
Review Comment:
nit: `{@link TooSoonToRerunSameFlowException}`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -324,10 +323,10 @@ public void createFlowSpec() throws Throwable {
/*
If another flow has already acquired lease for this flowspec details
within
- epsilon time, then we do not execute this flow, hence do not process and
store the spec
- and throw LeaseUnavailableException
+ lease consolidation time, then we do not execute this flow, hence do not
process and store the spec
+ and throw RuntimeException
*/
- @Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
+ @Test(expectedExceptions = RuntimeException.class)
Review Comment:
after signing off, I realized my literal advice would compromise clarity,
foul up tests, etc. (in this very way)
sorry for that half-baked advice... try this instead:
```
public static class TooSoonToRerunSameFlowException extends RuntimeException
{
@Getter private final FlowSpec flowSpec;
/**
* Account for unwrapping within @{link
FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for
`SpecCatalogListener`s
* @return `TooSoonToRerunSameFlowException` wrapped in another
`TooSoonToRerunSameFlowException
*/
public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec
flowSpec) {
return new TooSoonToRerunSameFlowException(flowSpec, new
TooSoonToRerunSameFlowException(flowSpec));
}
public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
this(flowSpec, null);
}
/** restricted-access ctor: use {@link #wrappedOnce(String)} instead */
private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable
cause) {
super("Lease already occupied by another recent execution of this flow:
" + flowSpec, cause);
this.flowSpec = flowSpec;
}
}
```
then replace:
```
throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));
```
with
```
throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
```
Issue Time Tracking
-------------------
Worklog Id: (was: 944391)
Time Spent: 5h (was: 4h 50m)
> Adhoc flows are not being deleted from GaaS FlowSpec store
> ----------------------------------------------------------
>
> Key: GOBBLIN-2173
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2173
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Abhishek Jain
> Assignee: Abhishek Tiwari
> Priority: Critical
> Time Spent: 5h
> Remaining Estimate: 0h
>
> In GaaS, we store adhoc flows temporarily in our flowspec DB in order to
> persist them in service restart/failover scenarios. However, it is expected
> that once these flows are kicked off/ forwarded to the DagProcEngine, they
> need to be removed from our flowspec db.
> This is currently not consistently happening, there seems to be some edge
> case(s) where they are persisted in the db. This can be fatal for users such
> as DIL that run adhoc flows using the same flowgroup/flowname consistently,
> which will lead to their flows being stuck. We need to find which edge cases
> are not handling the flow spec deletion properly.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)