OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs (gezapeti via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e8bd9fc9 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e8bd9fc9 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e8bd9fc9 Branch: refs/heads/oya Commit: e8bd9fc9271d07aaa074df3fd45e3b8d51974ce1 Parents: 64a5821 Author: Robert Kanter <rkan...@apache.org> Authored: Thu Mar 2 16:57:50 2017 -0800 Committer: Robert Kanter <rkan...@apache.org> Committed: Thu Mar 2 16:57:50 2017 -0800 ---------------------------------------------------------------------- release-log.txt | 1 + .../apache/oozie/action/hadoop/SparkMain.java | 61 ++++++++++---- .../TestDuplicateFilteringInSparkMain.java | 87 ++++++++++++++++++++ 3 files changed, 133 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index fdf6f2b..6937e24 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs (gezapeti via rkanter) OOZIE-2803 Mask passwords when printing out configs/args in MapReduceMain and SparkMain (pbacsko via rkanter) OOZIE-2799 Setting log location for spark sql on hive (satishsaley) OOZIE-2792 Hive2 action is not parsing Spark application ID from log file properly when Hive is on Spark (zhengxb2005 via rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index 88ac64e..c24d95c 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -27,8 +27,9 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -79,6 +80,8 @@ public class SparkMain extends LauncherMain { private static final String SPARK_YARN_JAR = "spark.yarn.jar"; private static final String SPARK_YARN_JARS = "spark.yarn.jars"; public static final String HIVE_SITE_CONF = "hive-site.xml"; + public static final String FILES_OPTION = "--files"; + public static final String ARCHIVES_OPTION = "--archives"; public static void main(String[] args) throws Exception { run(SparkMain.class, args); @@ -135,6 +138,7 @@ public class SparkMain extends LauncherMain { boolean addedLog4jExecutorSettings = false; StringBuilder driverClassPath = new StringBuilder(); StringBuilder executorClassPath = new StringBuilder(); + String userFiles = null, userArchives = null; String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); if (StringUtils.isNotEmpty(sparkOpts)) { List<String> sparkOptions = splitSparkOpts(sparkOpts); @@ -177,6 +181,16 @@ public class SparkMain extends LauncherMain { addedLog4jDriverSettings = true; } } + if(opt.startsWith(FILES_OPTION)) { + userFiles = sparkOptions.get(i + 1); + i++; + addToSparkArgs = false; + } + if(opt.startsWith(ARCHIVES_OPTION)) { + userArchives = sparkOptions.get(i + 1); + i++; + addToSparkArgs = false; + } if(addToSparkArgs) { sparkArgs.add(opt); } @@ -225,19 +239,24 @@ public class SparkMain extends LauncherMain { } if ((yarnClusterMode || yarnClientMode)) { - LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf)); - JarFilter jarfilter = new JarFilter(fixedUris, jarPath); + Map<String, URI> fixedFileUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf)); + fixedFileUrisMap.put(SPARK_LOG4J_PROPS, new Path(SPARK_LOG4J_PROPS).toUri()); + fixedFileUrisMap.put(HIVE_SITE_CONF, new Path(HIVE_SITE_CONF).toUri()); + addUserDefined(userFiles, fixedFileUrisMap); + Collection<URI> fixedFileUris = fixedFileUrisMap.values(); + JarFilter jarfilter = new JarFilter(fixedFileUris, jarPath); jarfilter.filter(); jarPath = jarfilter.getApplicationJar(); - fixedUris.add(new Path(SPARK_LOG4J_PROPS).toUri()); - fixedUris.add(new Path(HIVE_SITE_CONF).toUri()); - String cachedFiles = StringUtils.join(fixedUris, ","); + + String cachedFiles = StringUtils.join(fixedFileUris, ","); if (cachedFiles != null && !cachedFiles.isEmpty()) { sparkArgs.add("--files"); sparkArgs.add(cachedFiles); } - fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf)); - String cachedArchives = StringUtils.join(fixedUris, ","); + Map<String, URI> fixedArchiveUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache. + getCacheArchives(actionConf)); + addUserDefined(userArchives, fixedArchiveUrisMap); + String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), ","); if (cachedArchives != null && !cachedArchives.isEmpty()) { sparkArgs.add("--archives"); sparkArgs.add(cachedArchives); @@ -277,6 +296,15 @@ public class SparkMain extends LauncherMain { } } + private void addUserDefined(String userList, Map<String, URI> urisMap) { + if(userList != null) { + for (String file : userList.split(",")) { + Path p = new Path(file); + urisMap.put(p.getName(), p.toUri()); + } + } + } + private void prepareHadoopConfig(Configuration actionConf) throws IOException { // Copying oozie.action.conf.xml into hadoop configuration *-site files. if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, false)) { @@ -433,23 +461,24 @@ public class SparkMain extends LauncherMain { /** * Convert URIs into the default format which Spark expects - * + * Also filters out duplicate entries * @param files * @return * @throws IOException * @throws URISyntaxException */ - private LinkedList<URI> fixFsDefaultUris(URI[] files) throws IOException, URISyntaxException { + static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(URI[] files) throws IOException, URISyntaxException { + Map<String, URI> map= new HashMap<>(); if (files == null) { - return null; + return map; } - LinkedList<URI> listUris = new LinkedList<URI>(); FileSystem fs = FileSystem.get(new Configuration(true)); for (int i = 0; i < files.length; i++) { URI fileUri = files[i]; - listUris.add(getFixedUri(fs, fileUri)); + Path p = new Path(fileUri); + map.put(p.getName(), getFixedUri(fs, fileUri)); } - return listUris; + return map; } /** @@ -573,7 +602,7 @@ public class SparkMain extends LauncherMain { private String sparkVersion = "1.X.X"; private String sparkYarnJar; private String applicationJar; - private LinkedList<URI> listUris = null; + private Collection<URI> listUris = null; /** * @param listUris List of URIs to be filtered @@ -581,7 +610,7 @@ public class SparkMain extends LauncherMain { * @throws IOException * @throws URISyntaxException */ - public JarFilter(LinkedList<URI> listUris, String jarPath) throws URISyntaxException, IOException { + public JarFilter(Collection<URI> listUris, String jarPath) throws URISyntaxException, IOException { this.listUris = listUris; applicationJar = jarPath; Path p = new Path(jarPath); http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java new file mode 100644 index 0000000..9a231b1 --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +@RunWith(Parameterized.class) +public class TestDuplicateFilteringInSparkMain { + + private static final String PREFIX = "hdfs://namenode.address:8020/folder/"; + + static List<URI> getURIs(String... uriStrings) throws URISyntaxException { + URI[] uris = new URI[uriStrings.length]; + for (int i = 0; i != uriStrings.length; ++i) { + uris[i] = new URI(PREFIX + uriStrings[i]); + } + return Arrays.asList(uris); + } + + static Object[] testCase(List<URI> inputs, List<URI> expectedOutputs) { + return new Object[] {inputs, expectedOutputs}; + } + + @Parameterized.Parameters + public static List<Object[]> params() throws Exception { + return Arrays.asList( + testCase(getURIs("file.io"), + getURIs("file.io")), + + testCase(getURIs("file.io", "file.io", "file.io"), + getURIs("file.io")), + + testCase(getURIs("file.io", "file3.io", "file.io"), + getURIs("file.io", "file3.io")), + + testCase(getURIs("file.io", "file3.io", "file2.io"), + getURIs("file.io", "file2.io", "file3.io")) + ); + } + + private List<URI> input; + + private List<URI> expectedOutput; + + public TestDuplicateFilteringInSparkMain(List<URI> input, List<URI> result) { + this.input = input; + this.expectedOutput = result; + } + + @Test + public void test() throws Exception{ + Map<String, URI> uriMap = SparkMain.fixFsDefaultUrisAndFilterDuplicates(input.toArray(new URI[input.size()])); + assertThat("Duplicate filtering failed for >>" + input + "<<", uriMap.size(), is(expectedOutput.size())); + List<URI> outputList = Arrays.asList(uriMap.values().toArray(new URI[0])); + Collections.sort(outputList); + assertThat("Files are different in result ", outputList, is(expectedOutput)); + } + +}