Will-Lo commented on code in PR #3549:
URL: https://github.com/apache/gobblin/pull/3549#discussion_r970233775


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -364,6 +364,64 @@ public Spec getSpecWrapper(URI uri) {
    * @return a map of listeners and their {@link AddSpecResponse}s
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) 
throws Throwable {
+    Map<String, AddSpecResponse> responseMap = updateHelper(spec, 
triggerListener);
+    AddSpecResponse<String> compileResponse = 
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+    // Check that the flow configuration is valid and matches to a 
corresponding edge
+    if (isCompileSuccessful(compileResponse.getValue())) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Object syncObject = getSyncObject(flowSpec.getUri().toString());
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.addSpec(spec);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));
+    }
+
+    return responseMap;
+  }

Review Comment:
   This seems very similar to update besides that it calls 
specStore.addSpec(spec) vs specStore.updateSpec(spec, version). Is it possible 
for us to parameterize this or move logic around? e.g. 
`validateFlowCompilation(Flowspec spec, Callable<Void> specStoreCallable)` and 
send in a as a parameter. It's only used twice though so current still works, 
but this helps with repetition.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -364,6 +364,64 @@ public Spec getSpecWrapper(URI uri) {
    * @return a map of listeners and their {@link AddSpecResponse}s
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) 
throws Throwable {
+    Map<String, AddSpecResponse> responseMap = updateHelper(spec, 
triggerListener);
+    AddSpecResponse<String> compileResponse = 
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+    // Check that the flow configuration is valid and matches to a 
corresponding edge
+    if (isCompileSuccessful(compileResponse.getValue())) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Object syncObject = getSyncObject(flowSpec.getUri().toString());
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.addSpec(spec);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));
+    }
+
+    return responseMap;
+  }
+
+  public Map<String, AddSpecResponse> update(Spec spec, boolean 
triggerListener, long version) throws Throwable {
+    Map<String, AddSpecResponse> responseMap = updateHelper(spec, 
triggerListener);
+    AddSpecResponse<String> compileResponse = 
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+    // Check that the flow configuration is valid and matches to a 
corresponding edge
+    if (isCompileSuccessful(compileResponse.getValue())) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Object syncObject = getSyncObject(flowSpec.getUri().toString());
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.updateSpec(spec, version);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));
+    }
+
+    return responseMap;
+  }
+
+  private Map<String, AddSpecResponse> updateHelper(Spec spec, boolean 
triggerListener) throws Throwable {

Review Comment:
   Maybe more direct naming as to what this is doing? e.g. 
`getListenerResponses` and add a javadoc as to which listeners its currently 
listening on.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java:
##########
@@ -78,6 +78,16 @@ public interface SpecStore {
    */
   Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException;
 
+  /***
+   * Update {@link Spec} in the {@link SpecStore} when current version is 
smaller than {@link version}.
+   * @param spec {@link Spec} to be updated.
+   * @param version largest version that current spec should be
+   * @throws IOException Exception in updating the {@link Spec}.
+   * @return Updated {@link Spec}.
+   * @throws SpecNotFoundException If {@link Spec} being updated is not 
present in store.
+   */
+  default Spec updateSpec(Spec spec, long version) throws IOException, 
SpecNotFoundException {return updateSpec(spec);};

Review Comment:
   I'm confused about the purpose of the version parameter here. What's the 
intention of it if we set it as the max integer? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to