OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs 
(gezapeti via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e8bd9fc9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e8bd9fc9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e8bd9fc9

Branch: refs/heads/oya
Commit: e8bd9fc9271d07aaa074df3fd45e3b8d51974ce1
Parents: 64a5821
Author: Robert Kanter <rkan...@apache.org>
Authored: Thu Mar 2 16:57:50 2017 -0800
Committer: Robert Kanter <rkan...@apache.org>
Committed: Thu Mar 2 16:57:50 2017 -0800

----------------------------------------------------------------------
 release-log.txt                                 |  1 +
 .../apache/oozie/action/hadoop/SparkMain.java   | 61 ++++++++++----
 .../TestDuplicateFilteringInSparkMain.java      | 87 ++++++++++++++++++++
 3 files changed, 133 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index fdf6f2b..6937e24 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs 
(gezapeti via rkanter)
 OOZIE-2803 Mask passwords when printing out configs/args in MapReduceMain and 
SparkMain (pbacsko via rkanter)
 OOZIE-2799 Setting log location for spark sql on hive (satishsaley)
 OOZIE-2792 Hive2 action is not parsing Spark application ID from log file 
properly when Hive is on Spark (zhengxb2005 via rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 88ac64e..c24d95c 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -27,8 +27,9 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -79,6 +80,8 @@ public class SparkMain extends LauncherMain {
     private static final String SPARK_YARN_JAR = "spark.yarn.jar";
     private static final String SPARK_YARN_JARS = "spark.yarn.jars";
     public static final String HIVE_SITE_CONF = "hive-site.xml";
+    public static final String FILES_OPTION = "--files";
+    public static final String ARCHIVES_OPTION = "--archives";
 
     public static void main(String[] args) throws Exception {
         run(SparkMain.class, args);
@@ -135,6 +138,7 @@ public class SparkMain extends LauncherMain {
         boolean addedLog4jExecutorSettings = false;
         StringBuilder driverClassPath = new StringBuilder();
         StringBuilder executorClassPath = new StringBuilder();
+        String userFiles = null, userArchives = null;
         String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
         if (StringUtils.isNotEmpty(sparkOpts)) {
             List<String> sparkOptions = splitSparkOpts(sparkOpts);
@@ -177,6 +181,16 @@ public class SparkMain extends LauncherMain {
                         addedLog4jDriverSettings = true;
                     }
                 }
+                if(opt.startsWith(FILES_OPTION)) {
+                    userFiles = sparkOptions.get(i + 1);
+                    i++;
+                    addToSparkArgs = false;
+                }
+                if(opt.startsWith(ARCHIVES_OPTION)) {
+                    userArchives = sparkOptions.get(i + 1);
+                    i++;
+                    addToSparkArgs = false;
+                }
                 if(addToSparkArgs) {
                     sparkArgs.add(opt);
                 }
@@ -225,19 +239,24 @@ public class SparkMain extends LauncherMain {
         }
 
         if ((yarnClusterMode || yarnClientMode)) {
-            LinkedList<URI> fixedUris = 
fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf));
-            JarFilter jarfilter = new JarFilter(fixedUris, jarPath);
+            Map<String, URI> fixedFileUrisMap = 
fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
+            fixedFileUrisMap.put(SPARK_LOG4J_PROPS, new 
Path(SPARK_LOG4J_PROPS).toUri());
+            fixedFileUrisMap.put(HIVE_SITE_CONF, new 
Path(HIVE_SITE_CONF).toUri());
+            addUserDefined(userFiles, fixedFileUrisMap);
+            Collection<URI> fixedFileUris = fixedFileUrisMap.values();
+            JarFilter jarfilter = new JarFilter(fixedFileUris, jarPath);
             jarfilter.filter();
             jarPath = jarfilter.getApplicationJar();
-            fixedUris.add(new Path(SPARK_LOG4J_PROPS).toUri());
-            fixedUris.add(new Path(HIVE_SITE_CONF).toUri());
-            String cachedFiles = StringUtils.join(fixedUris, ",");
+
+            String cachedFiles = StringUtils.join(fixedFileUris, ",");
             if (cachedFiles != null && !cachedFiles.isEmpty()) {
                 sparkArgs.add("--files");
                 sparkArgs.add(cachedFiles);
             }
-            fixedUris = 
fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf));
-            String cachedArchives = StringUtils.join(fixedUris, ",");
+            Map<String, URI> fixedArchiveUrisMap = 
fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
+                    getCacheArchives(actionConf));
+            addUserDefined(userArchives, fixedArchiveUrisMap);
+            String cachedArchives = 
StringUtils.join(fixedArchiveUrisMap.values(), ",");
             if (cachedArchives != null && !cachedArchives.isEmpty()) {
                 sparkArgs.add("--archives");
                 sparkArgs.add(cachedArchives);
@@ -277,6 +296,15 @@ public class SparkMain extends LauncherMain {
         }
     }
 
+    private void addUserDefined(String userList, Map<String, URI> urisMap) {
+        if(userList != null) {
+            for (String file : userList.split(",")) {
+                Path p = new Path(file);
+                urisMap.put(p.getName(), p.toUri());
+            }
+        }
+    }
+
     private void prepareHadoopConfig(Configuration actionConf) throws 
IOException {
         // Copying oozie.action.conf.xml into hadoop configuration *-site 
files.
         if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, 
false)) {
@@ -433,23 +461,24 @@ public class SparkMain extends LauncherMain {
 
     /**
      * Convert URIs into the default format which Spark expects
-     *
+     * Also filters out duplicate entries
      * @param files
      * @return
      * @throws IOException
      * @throws URISyntaxException
      */
-    private LinkedList<URI> fixFsDefaultUris(URI[] files) throws IOException, 
URISyntaxException {
+    static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(URI[] files) 
throws IOException, URISyntaxException {
+        Map<String, URI> map= new HashMap<>();
         if (files == null) {
-            return null;
+            return map;
         }
-        LinkedList<URI> listUris = new LinkedList<URI>();
         FileSystem fs = FileSystem.get(new Configuration(true));
         for (int i = 0; i < files.length; i++) {
             URI fileUri = files[i];
-            listUris.add(getFixedUri(fs, fileUri));
+            Path p = new Path(fileUri);
+            map.put(p.getName(), getFixedUri(fs, fileUri));
         }
-        return listUris;
+        return map;
     }
 
     /**
@@ -573,7 +602,7 @@ public class SparkMain extends LauncherMain {
         private String sparkVersion = "1.X.X";
         private String sparkYarnJar;
         private String applicationJar;
-        private LinkedList<URI> listUris = null;
+        private Collection<URI> listUris = null;
 
         /**
          * @param listUris List of URIs to be filtered
@@ -581,7 +610,7 @@ public class SparkMain extends LauncherMain {
          * @throws IOException
          * @throws URISyntaxException
          */
-        public JarFilter(LinkedList<URI> listUris, String jarPath) throws 
URISyntaxException, IOException {
+        public JarFilter(Collection<URI> listUris, String jarPath) throws 
URISyntaxException, IOException {
             this.listUris = listUris;
             applicationJar = jarPath;
             Path p = new Path(jarPath);

http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
new file mode 100644
index 0000000..9a231b1
--- /dev/null
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class TestDuplicateFilteringInSparkMain {
+
+    private static final String PREFIX = 
"hdfs://namenode.address:8020/folder/";
+
+    static List<URI> getURIs(String... uriStrings) throws URISyntaxException {
+        URI[] uris = new URI[uriStrings.length];
+        for (int i = 0; i != uriStrings.length; ++i) {
+            uris[i] = new URI(PREFIX + uriStrings[i]);
+        }
+        return Arrays.asList(uris);
+    }
+
+    static Object[] testCase(List<URI> inputs, List<URI> expectedOutputs) {
+        return new Object[] {inputs, expectedOutputs};
+    }
+
+    @Parameterized.Parameters
+    public static List<Object[]> params() throws Exception {
+        return Arrays.asList(
+                testCase(getURIs("file.io"),
+                        getURIs("file.io")),
+
+                testCase(getURIs("file.io", "file.io", "file.io"),
+                        getURIs("file.io")),
+
+                testCase(getURIs("file.io", "file3.io", "file.io"),
+                        getURIs("file.io", "file3.io")),
+
+                testCase(getURIs("file.io", "file3.io", "file2.io"),
+                        getURIs("file.io", "file2.io", "file3.io"))
+                );
+    }
+
+    private List<URI> input;
+
+    private List<URI> expectedOutput;
+
+    public TestDuplicateFilteringInSparkMain(List<URI> input, List<URI> 
result) {
+        this.input = input;
+        this.expectedOutput = result;
+    }
+
+    @Test
+    public void test() throws Exception{
+        Map<String, URI> uriMap = 
SparkMain.fixFsDefaultUrisAndFilterDuplicates(input.toArray(new 
URI[input.size()]));
+        assertThat("Duplicate filtering failed for >>" + input + "<<", 
uriMap.size(), is(expectedOutput.size()));
+        List<URI> outputList = Arrays.asList(uriMap.values().toArray(new 
URI[0]));
+        Collections.sort(outputList);
+        assertThat("Files are different in result ", outputList, 
is(expectedOutput));
+    }
+
+}

Reply via email to