umustafi commented on code in PR #3583:
URL: https://github.com/apache/gobblin/pull/3583#discussion_r997366732


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Supports a configuration of a flowgraph where it can support multiple 
sub-flowgraphs within its directory
+ * Node definitions are shared between each subgraph, but can be overwritten 
within the subgraph
+ * Edge definitions are only defined in the subgraphs
+ * e.g.
+ * /gobblin-flowgraph
+ *   /subgraphA
+ *     /nodeA
+ *       /nodeB
+ *         edgeAB.properties
+ *   /subgraphB
+ *     /nodeA
+ *       /nodeB
+ *         edgeAB.properties
+ *       A.properties
+ *  /nodes
+ *    A.properties
+ *    B.properties
+ */
+@Slf4j
+public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
+
+  protected String sharedNodeFolder;
+  private static String NODE_FILE_SUFFIX = ".properties";
+  private static String SHARED_NODE_FOLDER_NAME = "nodes";
+  private static int NODE_FOLDER_DEPTH = 2;
+
+  public SharedFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> 
flowTemplateCatalog,
+      Map<URI, TopologySpec> topologySpecMap, String baseDirectory, String 
flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions, boolean 
instrumentationEnabled, Config config) {
+    super(flowTemplateCatalog, topologySpecMap, baseDirectory, 
flowGraphFolderName, javaPropsExtentions, hoconFileExtensions, 
instrumentationEnabled, config);
+    this.sharedNodeFolder = baseDirectory + File.separator + 
SHARED_NODE_FOLDER_NAME;
+  }
+
+  /**
+   * Looks into the sharedNodeFolder to use those configurations as fallbacks 
for the node to add
+   * Otherwise if the shared node does not exist, attempt to add the node in 
the same manner as {@link BaseFlowGraphHelper}
+   * @param graph
+   * @param path of node folder in the subgraph, so path is expected to be a 
directory
+   */
+  @Override
+  protected void addDataNode(FlowGraph graph, java.nio.file.Path path) {
+    try {
+      // Load node from shared folder first if it exists
+      Config sharedNodeConfig = ConfigFactory.empty();
+      String nodePropertyFile = path.getFileName().toString() + 
NODE_FILE_SUFFIX;
+      File sharedNodeFile = new File(this.sharedNodeFolder, nodePropertyFile);
+      if (sharedNodeFile.exists()) {
+        sharedNodeConfig = loadNodeFileWithOverrides(new 
Path(sharedNodeFile.getPath()));
+      }
+      File nodeFilePath = new File(path.toString(), nodePropertyFile);
+      Config nodeConfig = sharedNodeConfig;
+      if (nodeFilePath.exists()) {
+        nodeConfig = loadNodeFileWithOverrides(new 
Path(nodeFilePath.getPath())).withFallback(sharedNodeConfig);
+      }
+      if (nodeConfig.isEmpty()) {
+        throw new IOException(String.format("Cannot find expected property 
file %s in %s or %s", nodePropertyFile, sharedNodeFolder, path));
+      }
+      Class dataNodeClass = Class.forName(ConfigUtils.getString(nodeConfig, 
FlowGraphConfigurationKeys.DATA_NODE_CLASS,
+          FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
+      DataNode dataNode = (DataNode) 
GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig);
+      if (!graph.addDataNode(dataNode)) {
+        log.warn("Could not add DataNode {} to FlowGraph; skipping", 
dataNode.getId());
+      } else {
+        log.info("Added Datanode {} to FlowGraph", dataNode.getId());
+      }
+    } catch (Exception e) {
+      if (this.flowGraphUpdateFailedMeter.isPresent()) {
+        this.flowGraphUpdateFailedMeter.get().mark();
+      }
+      log.warn("Could not add DataNode defined in {} due to exception {}", 
path, e);

Review Comment:
   can you also add the datanode name to exception



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SharedFlowGraphHelper.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+/**
+ * Supports a configuration of a flowgraph where it can support multiple 
sub-flowgraphs within its directory
+ * Node definitions are shared between each subgraph, but can be overwritten 
within the subgraph
+ * Edge definitions are only defined in the subgraphs
+ * e.g.
+ * /gobblin-flowgraph
+ *   /subgraphA
+ *     /nodeA
+ *       /nodeB
+ *         edgeAB.properties
+ *   /subgraphB
+ *     /nodeA
+ *       /nodeB
+ *         edgeAB.properties
+ *       A.properties
+ *  /nodes
+ *    A.properties
+ *    B.properties
+ */
+@Slf4j
+public class SharedFlowGraphHelper extends BaseFlowGraphHelper {
+
+  protected String sharedNodeFolder;
+  private static String NODE_FILE_SUFFIX = ".properties";
+  private static String SHARED_NODE_FOLDER_NAME = "nodes";
+  private static int NODE_FOLDER_DEPTH = 2;

Review Comment:
   the folder depth is one above the property file depth. Can we modify the 
example above to number the depths to help clarify this as its hard to tell 
without experience with the flowgraph.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -60,21 +60,21 @@
  */
 @Slf4j
 public class BaseFlowGraphHelper {
-  public static final int NODE_FILE_DEPTH = 3;
-  public static final int EDGE_FILE_DEPTH = 4;
+  private static final int NODE_FILE_DEPTH = 3;
+  private static final int EDGE_FILE_DEPTH = 4;

Review Comment:
   Actually if you see the commented example below in `SharedFlowGraphHelper` 
it shows the expected structure



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -60,21 +60,21 @@
  */
 @Slf4j
 public class BaseFlowGraphHelper {
-  public static final int NODE_FILE_DEPTH = 3;
-  public static final int EDGE_FILE_DEPTH = 4;
+  private static final int NODE_FILE_DEPTH = 3;
+  private static final int EDGE_FILE_DEPTH = 4;

Review Comment:
   This assumption is about the structure of the flowgraph MP. Something like 
this below. See assumption also here 
https://jarvis.corp.linkedin.com/codesearch/result/?name=BaseFlowGraphListener.java&path=gobblin-elr%2Fgobblin-service%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fgobblin%2Fservice%2Fmodules%2Fflowgraph&reponame=linkedin%2Fgobblin-elr#59
 
    
   flowgraph mp
   -> gobblin-flowgraph-prod
        -> source node name 
             -> destination node name (this depth contains node prop file as 
well)
                  -> source to destination edge file properties 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to