OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)
(cherry picked from commit d249fbf43cd75b97f594330884189e8d8df55ddd) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/aae4948f Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/aae4948f Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/aae4948f Branch: refs/heads/branch-4.3 Commit: aae4948f115bafc48db7d5c47bd8e223cd337781 Parents: f5b835b Author: Satish Subhashrao Saley <sa...@yahoo-inc.com> Authored: Mon Feb 6 09:40:44 2017 -0800 Committer: satishsaley <satishsa...@apache.org> Committed: Fri Dec 8 16:34:55 2017 -0800 ---------------------------------------------------------------------- pom.xml | 2 +- release-log.txt | 1 + .../apache/oozie/action/hadoop/SparkMain.java | 184 ++++++++++++------- 3 files changed, 122 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/aae4948f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index bfd0af7..36631a1 100644 --- a/pom.xml +++ b/pom.xml @@ -1946,7 +1946,7 @@ <activeByDefault>false</activeByDefault> </activation> <properties> - <spark.version>2.0.0</spark.version> + <spark.version>2.1.0</spark.version> <spark.streaming.kafka.version>1.6.2</spark.streaming.kafka.version> <spark.bagel.version>1.6.2</spark.bagel.version> </properties> http://git-wip-us.apache.org/repos/asf/oozie/blob/aae4948f/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 3ee166a..9e74443 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.1 release +OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley) OOZIE-2777 Config-default.xml longer than 64k results in java.io.UTFDataFormatException (gezapeti via harsh) OOZIE-2771 Allow retrieving keystore and truststore passwords from Hadoop Credential Provider (asasvari via abhishekbafna) OOZIE-2748 NPE in LauncherMapper.printArgs() (pbacsko via rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/aae4948f/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index 0da74d4..8f2f438 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -72,8 +72,6 @@ public class SparkMain extends LauncherMain { private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*"); private static final String SPARK_YARN_JAR = "spark.yarn.jar"; private static final String SPARK_YARN_JARS = "spark.yarn.jars"; - private String sparkYarnJar = null; - private String sparkVersion = "1.X.X"; public static void main(String[] args) throws Exception { run(SparkMain.class, args); } @@ -216,19 +214,22 @@ public class SparkMain extends LauncherMain { } if ((yarnClusterMode || yarnClientMode)) { - LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath); - String cachedFiles = filterSparkYarnJar(fixedUris); + LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf)); + JarFilter jarfilter = new JarFilter(fixedUris, jarPath); + jarfilter.filter(); + jarPath = jarfilter.getApplicationJar(); + String cachedFiles = StringUtils.join(fixedUris, ","); if (cachedFiles != null && !cachedFiles.isEmpty()) { sparkArgs.add("--files"); sparkArgs.add(cachedFiles); } - fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath); + fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf)); String cachedArchives = StringUtils.join(fixedUris, ","); if (cachedArchives != null && !cachedArchives.isEmpty()) { sparkArgs.add("--archives"); sparkArgs.add(cachedArchives); } - setSparkYarnJarsConf(sparkArgs); + setSparkYarnJarsConf(sparkArgs, jarfilter.getSparkYarnJar(), jarfilter.getSparkVersion()); } if (!sparkArgs.contains(VERBOSE_OPTION)) { @@ -319,7 +320,7 @@ public class SparkMain extends LauncherMain { * @param fileNamePattern the pattern to look for * @return the file if there is one else it returns null */ - private File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { + private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { File localDir = new File("."); for(String fileName : localDir.list()){ if(fileNamePattern.matcher(fileName).find()){ @@ -424,7 +425,7 @@ public class SparkMain extends LauncherMain { * @throws IOException * @throws URISyntaxException */ - private LinkedList<URI> fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException { + private LinkedList<URI> fixFsDefaultUris(URI[] files) throws IOException, URISyntaxException { if (files == null) { return null; } @@ -432,70 +433,19 @@ public class SparkMain extends LauncherMain { FileSystem fs = FileSystem.get(new Configuration(true)); for (int i = 0; i < files.length; i++) { URI fileUri = files[i]; - // Spark compares URIs based on scheme, host and port. - // Here we convert URIs into the default format so that Spark - // won't think those belong to different file system. - // This will avoid an extra copy of files which already exists on - // same hdfs. - if (!fileUri.toString().equals(jarPath) && fs.getUri().getScheme().equals(fileUri.getScheme()) - && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null) - && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1 - || fs.getUri().getPort() == fileUri.getPort())) { - URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), - fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment()); - // Here we skip the application jar, because - // (if uris are same,) it will get distributed multiple times - // - one time with --files and another time as application jar. - if (!uri.toString().equals(jarPath)) { - listUris.add(uri); - } - } + listUris.add(getFixedUri(fs, fileUri)); } return listUris; } /** - * Filters out the Spark yarn jar and records its version - * - * @param listUris string containing uris separated by comma - * @return - * @throws OozieActionConfiguratorException - */ - private String filterSparkYarnJar(LinkedList<URI> listUris) throws OozieActionConfiguratorException { - Iterator<URI> iterator = listUris.iterator(); - File matchedFile = null; - while (iterator.hasNext()) { - URI uri = iterator.next(); - Path p = new Path(uri); - if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) { - matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN); - } - else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) { - matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN); - } - if (matchedFile != null) { - sparkYarnJar = uri.toString(); - try { - sparkVersion = getJarVersion(matchedFile); - System.out.println("Spark Version " + sparkVersion); - } - catch (IOException io) { - System.out.println( - "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion); - } - iterator.remove(); - break; - } - } - return StringUtils.join(listUris, ","); - } - - /** * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X. * * @param sparkArgs + * @param sparkYarnJar + * @param sparkVersion */ - private void setSparkYarnJarsConf(List<String> sparkArgs) { + private void setSparkYarnJarsConf(List<String> sparkArgs, String sparkYarnJar, String sparkVersion) { if (SPARK_VERSION_1.matcher(sparkVersion).find()) { // In Spark 1.X.X, set spark.yarn.jar to avoid // multiple distribution @@ -509,7 +459,7 @@ public class SparkMain extends LauncherMain { } } - private String getJarVersion(File jarFile) throws IOException { + private static String getJarVersion(File jarFile) throws IOException { @SuppressWarnings("resource") Manifest manifest = new JarFile(jarFile).getManifest(); return manifest.getMainAttributes().getValue("Specification-Version"); @@ -521,4 +471,110 @@ public class SparkMain extends LauncherMain { } to.append(what); } + + private static URI getFixedUri(URI fileUri) throws URISyntaxException, IOException { + FileSystem fs = FileSystem.get(new Configuration(true)); + return getFixedUri(fs, fileUri); + } + + /** + * Spark compares URIs based on scheme, host and port. Here we convert URIs + * into the default format so that Spark won't think those belong to + * different file system. This will avoid an extra copy of files which + * already exists on same hdfs. + * + * @param fs + * @param fileUri + * @return fixed uri + * @throws URISyntaxException + */ + private static URI getFixedUri(FileSystem fs, URI fileUri) throws URISyntaxException { + if (fs.getUri().getScheme().equals(fileUri.getScheme()) + && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null) + && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1 + || fs.getUri().getPort() == fileUri.getPort())) { + return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(), + fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment()); + } + return fileUri; + } + + /** + * This class is used for filtering out unwanted jars. + */ + private static class JarFilter { + private String sparkVersion = "1.X.X"; + private String sparkYarnJar; + private String applicationJar; + private LinkedList<URI> listUris = null; + + /** + * @param listUris List of URIs to be filtered + * @param jarPath Application jar + * @throws IOException + * @throws URISyntaxException + */ + public JarFilter(LinkedList<URI> listUris, String jarPath) throws URISyntaxException, IOException { + this.listUris = listUris; + applicationJar = jarPath; + Path p = new Path(jarPath); + if (p.isAbsolute()) { + applicationJar = getFixedUri(p.toUri()).toString(); + } + } + + /** + * Filters out the Spark yarn jar and application jar. Also records + * spark yarn jar's version. + * + * @throws OozieActionConfiguratorException + */ + private void filter() throws OozieActionConfiguratorException { + Iterator<URI> iterator = listUris.iterator(); + File matchedFile = null; + while (iterator.hasNext()) { + URI uri = iterator.next(); + Path p = new Path(uri); + if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN); + } + else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN); + } + if (matchedFile != null) { + sparkYarnJar = uri.toString(); + try { + sparkVersion = getJarVersion(matchedFile); + System.out.println("Spark Version " + sparkVersion); + } + catch (IOException io) { + System.out.println( + "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion); + } + iterator.remove(); + matchedFile = null; + } + // Here we skip the application jar, because + // (if uris are same,) it will get distributed multiple times + // - one time with --files and another time as application jar. + if (p.getName().equals(applicationJar) || uri.toString().equals(applicationJar)) { + applicationJar = uri.toString(); + iterator.remove(); + } + } + } + + public String getApplicationJar() { + return applicationJar; + } + + public String getSparkYarnJar() { + return sparkYarnJar; + } + + public String getSparkVersion() { + return sparkVersion; + } + + } }