phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848535757
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -394,8 +394,8 @@ private Map<String, AddSpecResponse>
updateOrAddSpecHelper(Spec spec, boolean tr
if (!response.getValue().getFailures().isEmpty()) {
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getFailures().entrySet()) {
Throwable error = entry.getValue().getError();
- if (error instanceof TooSoonToRerunSameFlowException) {
- throw (TooSoonToRerunSameFlowException) error;
+ if (error instanceof RuntimeException && error.getCause() instanceof
TooSoonToRerunSameFlowException) {
+ throw (TooSoonToRerunSameFlowException) error.getCause();
Review Comment:
the cast isn't necessary (the reason I suggested wrapping `TooSoonToR...`
was to enable uniform, type-agnostic code here.)
for a method with the signature `throws Throwable`, aren't these two
equivalent?
```
throw error.getCause();
throw (T) error.getCause();
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) {
}
}
+ /*
+ Determines if a lease can be acquired for the given flow. A lease is
acquirable if
+ no existing lease record exists in arbiter table or the record is older
then epsilon time
+ */
Review Comment:
probably no need for this comment here in the impl, but if you want one,
bring it into line w/ the orig from the interface
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -24,6 +24,7 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;
Review Comment:
this import belongs a few lines down w/ other apache gobblin pkgs
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(null);
}
+ /*
+ enforces that a similar adhoc flow is not launching,
+ else throw TooSoonToRerunSameFlowException
Review Comment:
nit: `{@link TooSoonToRerunSameFlowException}`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -324,10 +323,10 @@ public void createFlowSpec() throws Throwable {
/*
If another flow has already acquired lease for this flowspec details
within
- epsilon time, then we do not execute this flow, hence do not process and
store the spec
- and throw LeaseUnavailableException
+ lease consolidation time, then we do not execute this flow, hence do not
process and store the spec
+ and throw RuntimeException
*/
- @Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
+ @Test(expectedExceptions = RuntimeException.class)
Review Comment:
after signing off, I realized my literal advice would compromise clarity,
foul up tests, etc. (in this very way)
sorry for that half-baked advice... try this instead:
```
public static class TooSoonToRerunSameFlowException extends RuntimeException
{
@Getter private final FlowSpec flowSpec;
/**
* Account for unwrapping within @{link
FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for
`SpecCatalogListener`s
* @return `TooSoonToRerunSameFlowException` wrapped in another
`TooSoonToRerunSameFlowException
*/
public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec
flowSpec) {
return new TooSoonToRerunSameFlowException(flowSpec, new
TooSoonToRerunSameFlowException(flowSpec));
}
public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
this(flowSpec, null);
}
/** restricted-access ctor: use {@link #wrappedOnce(String)} instead */
private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable
cause) {
super("Lease already occupied by another recent execution of this flow:
" + flowSpec, cause);
this.flowSpec = flowSpec;
}
}
```
then replace:
```
throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));
```
with
```
throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]