phet commented on code in PR #4076:
URL: https://github.com/apache/gobblin/pull/4076#discussion_r1848535757


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -394,8 +394,8 @@ private Map<String, AddSpecResponse> 
updateOrAddSpecHelper(Spec spec, boolean tr
       if (!response.getValue().getFailures().isEmpty()) {
         for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry : response.getValue().getFailures().entrySet()) {
           Throwable error = entry.getValue().getError();
-          if (error instanceof TooSoonToRerunSameFlowException) {
-            throw (TooSoonToRerunSameFlowException) error;
+          if (error instanceof RuntimeException && error.getCause() instanceof 
TooSoonToRerunSameFlowException) {
+            throw (TooSoonToRerunSameFlowException) error.getCause();

Review Comment:
   the cast isn't necessary (the reason I suggested wrapping `TooSoonToR...` 
was to enable uniform, type-agnostic code here.)
   
   for a method with the signature `throws Throwable`, aren't these two 
equivalent?
   ```
   throw error.getCause();
   throw (T) error.getCause();
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -362,6 +362,16 @@ else if (leaseValidityStatus == 2) {
     }
   }
 
+  /*
+    Determines if a lease can be acquired for the given flow. A lease is 
acquirable if
+    no existing lease record exists in arbiter table or the record is older 
then epsilon time
+   */

Review Comment:
   probably no need for this comment here in the impl, but if you want one, 
bring it into line w/ the orig from the interface



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -24,6 +24,7 @@
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.gobblin.runtime.api.TooSoonToRerunSameFlowException;

Review Comment:
   this import belongs a few lines down w/ other apache gobblin pkgs



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -133,6 +137,29 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
     return new AddSpecResponse<>(null);
   }
 
+  /*
+    enforces that a similar adhoc flow is not launching,
+    else throw TooSoonToRerunSameFlowException

Review Comment:
   nit: `{@link TooSoonToRerunSameFlowException}`



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java:
##########
@@ -324,10 +323,10 @@ public void createFlowSpec() throws Throwable {
 
   /*
      If another flow has already acquired lease for this flowspec details 
within
-     epsilon time, then we do not execute this flow, hence do not process and 
store the spec
-     and throw LeaseUnavailableException
+     lease consolidation time, then we do not execute this flow, hence do not 
process and store the spec
+     and throw RuntimeException
    */
-  @Test(expectedExceptions = TooSoonToRerunSameFlowException.class)
+  @Test(expectedExceptions = RuntimeException.class)

Review Comment:
   after signing off, I realized my literal advice would compromise clarity, 
foul up tests, etc. (in this very way)
   
   sorry for that half-baked advice... try this instead:
   ```
   public static class TooSoonToRerunSameFlowException extends RuntimeException 
{
     @Getter private final FlowSpec flowSpec;
   
     /**
      * Account for unwrapping within @{link 
FlowCatalog#updateOrAddSpecHelper}`s `CallbackResult` error handling for 
`SpecCatalogListener`s
      * @return `TooSoonToRerunSameFlowException` wrapped in another 
`TooSoonToRerunSameFlowException
      */
     public static TooSoonToRerunSameFlowException wrappedOnce(FlowSpec 
flowSpec) {
       return new TooSoonToRerunSameFlowException(flowSpec, new 
TooSoonToRerunSameFlowException(flowSpec));
     }
   
     public TooSoonToRerunSameFlowException(FlowSpec flowSpec) {
       this(flowSpec, null);
     }
   
     /** restricted-access ctor: use {@link #wrappedOnce(String)} instead */
     private TooSoonToRerunSameFlowException(FlowSpec flowSpec, Throwable 
cause) {
       super("Lease already occupied by another recent execution of this flow: 
" + flowSpec, cause);
       this.flowSpec = flowSpec;
     }
   }
   ```
   
   then replace:
   ```
   throw new RuntimeException(new TooSoonToRerunSameFlowException(flowSpec));
   ```
   with
   ```
   throw TooSoonToRerunSameFlowException.wrappedOnce(flowSpec);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to