This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 57310a695ad49aa7e4c5b9fd65d79b484674f27c
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Dec 20 11:26:09 2017 -0800

    Post code review amendments and refactor
---
 .../org/apache/beam/runners/flink/FlinkRunner.java | 36 +---------
 .../beam/runners/dataflow/DataflowRunner.java      | 41 ++----------
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 36 ----------
 .../beam/runners/spark/SparkContextOptions.java    |  2 +-
 .../beam/runners/spark/SparkPipelineOptions.java   | 21 +++---
 .../org/apache/beam/runners/spark/SparkRunner.java | 13 ++++
 .../spark/translation/SparkContextFactory.java     |  4 +-
 .../org/apache/beam/sdk/util/PipelineUtils.java    | 59 +++++++++++++++++
 .../apache/beam/sdk/util/PipelineUtilsTest.java    | 76 ++++++++++++++++++++++
 9 files changed, 170 insertions(+), 118 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index ca12615..f2be9a7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -17,14 +17,11 @@
  */
 package org.apache.beam.runners.flink;
 
+import static 
org.apache.beam.sdk.util.PipelineUtils.detectClassPathResourcesToStage;
+
 import com.google.common.base.Joiner;
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -150,36 +147,7 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
     return "FlinkRunner#" + hashCode();
   }
 
-  /**
-   * Attempts to detect all the resources the class loader has access to. This 
does not recurse
-   * to class loader parents stopping it from pulling in resources from the 
system class loader.
-   *
-   * @param classLoader The URLClassLoader to use to detect resources to stage.
-   * @return A list of absolute paths to the resources the class loader uses.
-   * @throws IllegalArgumentException If either the class loader is not a 
URLClassLoader or one
-   *   of the resources the class loader exposes is not a file resource.
-   */
-  protected static List<String> detectClassPathResourcesToStage(
-      ClassLoader classLoader) {
-    if (!(classLoader instanceof URLClassLoader)) {
-      String message = String.format("Unable to use ClassLoader to detect 
classpath elements. "
-          + "Current ClassLoader is %s, only URLClassLoaders are supported.", 
classLoader);
-      LOG.error(message);
-      throw new IllegalArgumentException(message);
-    }
 
-    List<String> files = new ArrayList<>();
-    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
-      try {
-        files.add(new File(url.toURI()).getAbsolutePath());
-      } catch (IllegalArgumentException | URISyntaxException e) {
-        String message = String.format("Unable to convert url (%s) to file.", 
url);
-        LOG.error(message);
-        throw new IllegalArgumentException(message, e);
-      }
-    }
-    return files;
-  }
 
   /** A set of {@link View}s with non-deterministic key coders. */
   Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 942d36b..729ec9c 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import static 
org.apache.beam.sdk.util.PipelineUtils.detectClassPathResourcesToStage;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 
@@ -41,12 +42,8 @@ import com.google.common.base.Utf8;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -256,9 +253,9 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         throw new IllegalArgumentException("No files to stage has been 
found.");
       } else {
         LOG.info("PipelineOptions.filesToStage was not specified. "
-                        + "Defaulting to files from the classpath: will stage 
{} files. "
-                        + "Enable logging at DEBUG level to see which files 
will be staged.",
-                dataflowOptions.getFilesToStage().size());
+                + "Defaulting to files from the classpath: will stage {} 
files. "
+                + "Enable logging at DEBUG level to see which files will be 
staged.",
+            dataflowOptions.getFilesToStage().size());
         LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
       }
     }
@@ -1566,36 +1563,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Attempts to detect all the resources the class loader has access to. This 
does not recurse
-   * to class loader parents stopping it from pulling in resources from the 
system class loader.
-   *
-   * @param classLoader The URLClassLoader to use to detect resources to stage.
-   * @throws IllegalArgumentException  If either the class loader is not a 
URLClassLoader or one
-   * of the resources the class loader exposes is not a file resource.
-   * @return A list of absolute paths to the resources the class loader uses.
-   */
-  protected static List<String> detectClassPathResourcesToStage(ClassLoader 
classLoader) {
-    if (!(classLoader instanceof URLClassLoader)) {
-      String message = String.format("Unable to use ClassLoader to detect 
classpath elements. "
-          + "Current ClassLoader is %s, only URLClassLoaders are supported.", 
classLoader);
-      LOG.error(message);
-      throw new IllegalArgumentException(message);
-    }
-
-    List<String> files = new ArrayList<>();
-    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
-      try {
-        files.add(new File(url.toURI()).getAbsolutePath());
-      } catch (IllegalArgumentException | URISyntaxException e) {
-        String message = String.format("Unable to convert url (%s) to file.", 
url);
-        LOG.error(message);
-        throw new IllegalArgumentException(message, e);
-      }
-    }
-    return files;
-  }
-
-  /**
    * Finds the id for the running job of the given name.
    */
   private String getJobIdFromName(String jobName) {
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index edf513b..90748af 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -53,8 +53,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Files;
@@ -628,40 +626,6 @@ public class DataflowRunnerTest implements Serializable {
   }
 
   @Test
-  public void detectClassPathResourceWithFileResources() throws Exception {
-    File file = tmpFolder.newFile("file");
-    File file2 = tmpFolder.newFile("file2");
-    URLClassLoader classLoader = new URLClassLoader(new URL[] {
-        file.toURI().toURL(),
-        file2.toURI().toURL()
-    });
-
-    assertEquals(ImmutableList.of(file.getAbsolutePath(), 
file2.getAbsolutePath()),
-        DataflowRunner.detectClassPathResourcesToStage(classLoader));
-  }
-
-  @Test
-  public void detectClassPathResourcesWithUnsupportedClassLoader() {
-    ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unable to use ClassLoader to detect classpath 
elements.");
-
-    DataflowRunner.detectClassPathResourcesToStage(mockClassLoader);
-  }
-
-  @Test
-  public void detectClassPathResourceWithNonFileResources() throws Exception {
-    String url = "http://www.google.com/all-the-secrets.jar";;
-    URLClassLoader classLoader = new URLClassLoader(new URL[] {
-        new URL(url)
-    });
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Unable to convert url (" + url + ") to file.");
-
-    DataflowRunner.detectClassPathResourcesToStage(classLoader);
-  }
-
-  @Test
   public void testGcsStagingLocationInitialization() throws Exception {
     // Set temp location (required), and check that staging location is set.
     DataflowPipelineOptions options = 
PipelineOptionsFactory.as(DataflowPipelineOptions.class);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
index 98f7492..0a7995f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
@@ -54,7 +54,7 @@ public interface SparkContextOptions extends 
SparkPipelineOptions {
   List<JavaStreamingListener> getListeners();
   void setListeners(List<JavaStreamingListener> listeners);
 
-  /** Returns an empty list, top avoid handling null. */
+  /** Returns an empty list, to avoid handling null. */
   class EmptyListenersList implements 
DefaultValueFactory<List<JavaStreamingListener>> {
     @Override
     public List<JavaStreamingListener> create(PipelineOptions options) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 2053b44..146f25b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark;
 
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -28,8 +27,6 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 
-
-
 /**
  * Spark runner {@link PipelineOptions} handles Spark execution-related 
configurations,
  * such as the master address, batch-interval, and other user-related knobs.
@@ -104,12 +101,20 @@ public interface SparkPipelineOptions
   boolean getUsesProvidedSparkContext();
   void setUsesProvidedSparkContext(boolean value);
 
-  @Description("Jars for spark context")
-  @Default.InstanceFactory(SparkContextOptions.EmptyPathList.class)
-  List<String> getJarsForSparkContext();
-  void setJarsForSparkContext(List<String> jars);
+  /**
+   * List of local files to make available to workers.
+   *
+   * <p>Jars are placed on the worker's classpath.
+   *
+   * <p>The default value is the list of jars from the main program's 
classpath.
+   */
+  @Description("Jar-Files to send to all workers and put on the classpath. "
+      + "The default value is all files from the classpath.")
+  @Default.InstanceFactory(EmptyPathList.class)
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
 
-  /** Returns an empty path list, top avoid handling null. */
+  /** Returns an empty path list, to avoid handling null. */
   class EmptyPathList implements DefaultValueFactory<List<String>> {
     @Override
     public List<String> create(PipelineOptions options) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 4a409cb..ccf8283 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.spark;
 
+import static 
org.apache.beam.sdk.util.PipelineUtils.detectClassPathResourcesToStage;
+
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Collection;
@@ -121,6 +123,17 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   public static SparkRunner fromOptions(PipelineOptions options) {
     SparkPipelineOptions sparkOptions =
         PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+
+    if (sparkOptions.getFilesToStage().isEmpty()) {
+      sparkOptions.setFilesToStage(detectClassPathResourcesToStage(
+          SparkRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be 
staged.",
+          sparkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage());
+    }
+
     return new SparkRunner(sparkOptions);
   }
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 68c4093..d0b467a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -93,8 +93,8 @@ public final class SparkContextFactory {
         conf.setMaster(contextOptions.getSparkMaster());
       }
 
-      if (contextOptions.getJarsForSparkContext().size() > 0) {
-        conf.setJars(contextOptions.getJarsForSparkContext().toArray(new 
String[0]));
+      if (contextOptions.getFilesToStage().size() > 0) {
+        conf.setJars(contextOptions.getFilesToStage().toArray(new String[0]));
       }
 
       conf.setAppName(contextOptions.getAppName());
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PipelineUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PipelineUtils.java
new file mode 100644
index 0000000..1614dca
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PipelineUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utilities for working with Pipelines.
+ */
+public class PipelineUtils {
+
+  /**
+   * Attempts to detect all the resources the class loader has access to. This 
does not recurse
+   * to class loader parents stopping it from pulling in resources from the 
system class loader.
+   *
+   * @param classLoader The URLClassLoader to use to detect resources to stage.
+   * @throws IllegalArgumentException  If either the class loader is not a 
URLClassLoader or one
+   * of the resources the class loader exposes is not a file resource.
+   * @return A list of absolute paths to the resources the class loader uses.
+   */
+  public static List<String> detectClassPathResourcesToStage(ClassLoader 
classLoader) {
+    if (!(classLoader instanceof URLClassLoader)) {
+      String message = String.format("Unable to use ClassLoader to detect 
classpath elements. "
+          + "Current ClassLoader is %s, only URLClassLoaders are supported.", 
classLoader);
+      throw new IllegalArgumentException(message);
+    }
+
+    List<String> files = new ArrayList<>();
+    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+      try {
+        files.add(new File(url.toURI()).getAbsolutePath());
+      } catch (IllegalArgumentException | URISyntaxException e) {
+        String message = String.format("Unable to convert url (%s) to file.", 
url);
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+    return files;
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PipelineUtilsTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PipelineUtilsTest.java
new file mode 100644
index 0000000..4d03615
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PipelineUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Tests for PipelineUtils.
+ */
+@RunWith(JUnit4.class)
+public class PipelineUtilsTest {
+
+  @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void detectClassPathResourceWithFileResources() throws Exception {
+    File file = tmpFolder.newFile("file");
+    File file2 = tmpFolder.newFile("file2");
+    URLClassLoader classLoader = new URLClassLoader(new URL[] {
+        file.toURI().toURL(),
+        file2.toURI().toURL()
+    });
+
+    assertEquals(ImmutableList.of(file.getAbsolutePath(), 
file2.getAbsolutePath()),
+        PipelineUtils.detectClassPathResourcesToStage(classLoader));
+  }
+
+  @Test
+  public void detectClassPathResourcesWithUnsupportedClassLoader() {
+    ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Unable to use ClassLoader to detect classpath 
elements.");
+
+    PipelineUtils.detectClassPathResourcesToStage(mockClassLoader);
+  }
+
+  @Test
+  public void detectClassPathResourceWithNonFileResources() throws Exception {
+    String url = "http://www.google.com/all-the-secrets.jar";;
+    URLClassLoader classLoader = new URLClassLoader(new URL[] {
+        new URL(url)
+    });
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Unable to convert url (" + url + ") to file.");
+
+    PipelineUtils.detectClassPathResourcesToStage(classLoader);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <commits@beam.apache.org>.

Reply via email to