amend OOZIE-2787 Oozie distributes application jar twice making the spark job 
fail (satishsaley)

(cherry picked from commit 0da31f47ea975387c5d05763c632a5b8915bd9cc)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/fb44f427
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/fb44f427
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/fb44f427

Branch: refs/heads/branch-4.3
Commit: fb44f427ed6ac9ebd1b771c1fe2dec06c7370b4b
Parents: 3709ea2
Author: Satish Subhashrao Saley <sa...@yahoo-inc.com>
Authored: Thu Feb 9 23:00:35 2017 -0800
Committer: satishsaley <satishsa...@apache.org>
Committed: Fri Dec 8 16:34:55 2017 -0800

----------------------------------------------------------------------
 .../site/twiki/DG_SparkActionExtension.twiki    |  36 ++++++
 .../apache/oozie/action/hadoop/SparkMain.java   |  24 +++-
 .../oozie/action/hadoop/TestJarFilter.java      | 109 +++++++++++++++++++
 3 files changed, 165 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/fb44f427/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki 
b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 74875bb..94175cd 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -200,6 +200,42 @@ The =jar= element indicates python file. Refer to the file 
by it's localized nam
 in PySpark. The py file should be in the lib/ folder next to the workflow.xml 
or added using the =file= element so that
 it's localized to the working directory with just its name.
 
+---+++ Using Symlink in <jar>
+
+A symlink must be specified using =[[WorkflowFunctionalSpec#a3.2.2.
+1_Adding_Files_and_Archives_for_the_Job][file]]= element. Then, you can use
+the symlink name in =jar= element.
+
+*Example:*
+
+Specifying relative path for symlink:
+
+Make sure that the file is within the application directory i.e. 
=oozie.wf.application.path= .
+<verbatim>
+        <spark xmlns="uri:oozie:spark-action:0.2">
+        ...
+            <jar>py-spark-example-symlink.py</jar>
+            ...
+            ...
+            <file>py-spark.py#py-spark-example-symlink.py</file>
+        ...
+        </spark>
+</verbatim>
+
+Specifying full path for symlink:
+<verbatim>
+        <spark xmlns="uri:oozie:spark-action:0.2">
+        ...
+            <jar>spark-example-symlink.jar</jar>
+            ...
+            ...
+            
<file>hdfs://localhost:8020/user/testjars/all-oozie-examples.jar#spark-example-symlink.jar</file>
+        ...
+        </spark>
+</verbatim>
+
+
+
 ---++ Appendix, Spark XML-Schema
 
 ---+++ AE.A Appendix A, Spark XML-Schema

http://git-wip-us.apache.org/repos/asf/oozie/blob/fb44f427/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index d37053d..12eb61a 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -520,7 +520,7 @@ public class SparkMain extends LauncherMain {
     /**
      * This class is used for filtering out unwanted jars.
      */
-    private static class JarFilter {
+    static class JarFilter {
         private String sparkVersion = "1.X.X";
         private String sparkYarnJar;
         private String applicationJar;
@@ -547,9 +547,10 @@ public class SparkMain extends LauncherMain {
          *
          * @throws OozieActionConfiguratorException
          */
-        private void filter() throws OozieActionConfiguratorException {
+        public void filter() throws OozieActionConfiguratorException {
             Iterator<URI> iterator = listUris.iterator();
             File matchedFile = null;
+            Path applJarPath = new Path(applicationJar);
             while (iterator.hasNext()) {
                 URI uri = iterator.next();
                 Path p = new Path(uri);
@@ -575,13 +576,28 @@ public class SparkMain extends LauncherMain {
                 // Here we skip the application jar, because
                 // (if uris are same,) it will get distributed multiple times
                 // - one time with --files and another time as application jar.
-                if (p.getName().equals(applicationJar) || 
uri.toString().equals(applicationJar)) {
-                    applicationJar = uri.toString();
+                if (isApplicationJar(p.getName(), uri, applJarPath)) {
+                    String fragment = uri.getFragment();
+                    applicationJar = fragment != null && fragment.length() > 0 
? fragment : uri.toString();
                     iterator.remove();
                 }
             }
         }
 
+        /**
+         * Checks if a file is application jar
+         *
+         * @param fileName fileName name of the file
+         * @param fileUri fileUri URI of the file
+         * @param applJarPath Path of application jar
+         * @return true if fileName or fileUri is the application jar
+         */
+        private boolean isApplicationJar(String fileName, URI fileUri, Path 
applJarPath) {
+            return (fileName.equals(applicationJar) || 
fileUri.toString().equals(applicationJar)
+                    || applJarPath.getName().equals(fileName)
+                    || applicationJar.equals(fileUri.getFragment()));
+        }
+
         public String getApplicationJar() {
             return applicationJar;
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fb44f427/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
new file mode 100644
index 0000000..2d4c83c
--- /dev/null
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.LinkedList;
+import java.util.jar.Attributes;
+import java.util.jar.JarOutputStream;
+import java.util.jar.Manifest;
+
+import org.apache.oozie.action.hadoop.SparkMain.JarFilter;
+import org.junit.Test;
+
+public class TestJarFilter {
+
+    @Test
+    public void testJarFilter() throws URISyntaxException, IOException, 
OozieActionConfiguratorException {
+        LinkedList<URI> listUris = new LinkedList<URI>();
+        String sparkVersion = "2.1.0";
+        String sparkYarnJar = "spark-yarn-" + sparkVersion + ".jar";
+        String applicationJarName = "oozie-examples.jar";
+        String renamedApplicationJar = "renamed-oozie-examples.jar";
+        URI sparkYarnJarUri = new URI("hdfs://localhost:8020/user/sparkjars/" 
+ sparkYarnJar);
+        URI applicationJarUri = new 
URI("hdfs://localhost:8020/user/sparkdata/" + applicationJarName);
+        createSparkYarnJar(sparkYarnJar, sparkVersion);
+        populateUris(listUris, sparkYarnJarUri, applicationJarUri);
+
+        // check application jar, spark yarn jar and spark version
+        JarFilter jarFilter = new JarFilter(listUris, 
applicationJarUri.getPath());
+        jarFilter.filter();
+        assertEquals(applicationJarUri.toString(), 
jarFilter.getApplicationJar());
+        assertEquals(sparkYarnJarUri.toString(), jarFilter.getSparkYarnJar());
+        assertEquals(sparkVersion, jarFilter.getSparkVersion());
+        checkFilteredUris(listUris, sparkYarnJarUri.toString(), 
applicationJarUri.toString());
+        listUris.clear();
+
+        // check application jar with fragmented URI
+        applicationJarUri = new URI("hdfs", "localhost", "/user/sparkdata/" + 
applicationJarName,
+                renamedApplicationJar);
+        populateUris(listUris, sparkYarnJarUri, applicationJarUri);
+        jarFilter = new JarFilter(listUris, applicationJarUri.getPath());
+        jarFilter.filter();
+        assertEquals(renamedApplicationJar, jarFilter.getApplicationJar());
+        assertEquals(sparkYarnJarUri.toString(), jarFilter.getSparkYarnJar());
+        assertEquals(sparkVersion, jarFilter.getSparkVersion());
+        checkFilteredUris(listUris, sparkYarnJarUri.toString(), 
renamedApplicationJar);
+        listUris.clear();
+
+        // application jar is present in <file> with symlink
+        // and user mentioned the symlink name in <jar>
+        populateUris(listUris, sparkYarnJarUri, applicationJarUri);
+        jarFilter = new JarFilter(listUris, renamedApplicationJar);
+        jarFilter.filter();
+        assertEquals(renamedApplicationJar, jarFilter.getApplicationJar());
+        assertEquals(sparkYarnJarUri.toString(), jarFilter.getSparkYarnJar());
+        assertEquals(sparkVersion, jarFilter.getSparkVersion());
+        checkFilteredUris(listUris, sparkYarnJarUri.toString(), 
renamedApplicationJar);
+    }
+
+    private void checkFilteredUris(LinkedList<URI> listUris, String 
sparkYarnJar, String applicationJar) {
+        for(URI uri : listUris) {
+            assertFalse(uri.toString().contains(sparkYarnJar));
+            assertFalse(uri.toString().contains(applicationJar));
+        }
+    }
+
+    private void createSparkYarnJar(String sparkYarnJar, String sparkVersion)
+            throws FileNotFoundException, IOException {
+        Manifest manifest = new Manifest();
+        manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, 
"1.0");
+        
manifest.getMainAttributes().put(Attributes.Name.SPECIFICATION_VERSION, 
sparkVersion);
+        JarOutputStream target = new JarOutputStream(new 
FileOutputStream(sparkYarnJar), manifest);
+        target.flush();
+        target.close();
+    }
+
+    private void populateUris(LinkedList<URI> listUris, URI sparkYarnJarUri, 
URI applicationJarUri)
+            throws URISyntaxException {
+        String fileNamePattern = "hdfs://localhost:8020/user/sparkdata/%s%s%s";
+        listUris.add(new URI(String.format(fileNamePattern, "_SUCCESS", "#", 
"ouputflag.txt")));
+        listUris.add(new URI(String.format(fileNamePattern, "dependency.jar", 
"", "")));
+        listUris.add(new URI(String.format(fileNamePattern, "helper.jar", "", 
"")));
+        listUris.add(sparkYarnJarUri);
+        listUris.add(applicationJarUri);
+    }
+}
\ No newline at end of file

Reply via email to