Repository: flink
Updated Branches:
  refs/heads/release-1.2 664c49df7 -> ef20aa1a1


[FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically

Sorts files read from Flink's lib directory and places the distribution
JAR to the end of the CLASSPATH.

This closes #3632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef20aa1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef20aa1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef20aa1a

Branch: refs/heads/release-1.2
Commit: ef20aa1a1540844888829bfd685e94764f3fd8ea
Parents: 664c49d
Author: Greg Hogan <c...@greghogan.com>
Authored: Sun Mar 26 15:46:00 2017 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Thu Apr 27 15:00:16 2017 -0400

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh     | 18 ++++++++++---
 .../main/flink-bin/mesos-bin/mesos-appmaster.sh | 17 +-----------
 .../flink-bin/mesos-bin/mesos-taskmanager.sh    | 17 +-----------
 .../src/main/flink-bin/yarn-bin/yarn-session.sh | 17 +-----------
 .../yarn/AbstractYarnClusterDescriptor.java     | 28 +++++++++++---------
 5 files changed, 33 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 69d70ef..071bd71 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -18,16 +18,28 @@
 
################################################################################
 
 constructFlinkClassPath() {
+    local FLINK_DIST
+    local FLINK_CLASSPATH
 
     while read -d '' -r jarfile ; do
-        if [[ $FLINK_CLASSPATH = "" ]]; then
+        if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then
+            FLINK_DIST="$FLINK_DIST":"$jarfile"
+        elif [[ "$FLINK_CLASSPATH" == "" ]]; then
             FLINK_CLASSPATH="$jarfile";
         else
             FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
         fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
 
-    echo $FLINK_CLASSPATH
+    if [[ "$FLINK_DIST" == "" ]]; then
+        # write error message to stderr since stdout is stored as the classpath
+        (>&2 echo "[ERROR] Flink distribution jar not found in 
$FLINK_LIB_DIR.")
+
+        # exit function with empty classpath to force process failure
+        exit 1
+    fi
+
+    echo "$FLINK_CLASSPATH""$FLINK_DIST"
 }
 
 # These are used to mangle paths that are passed to java when using

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
index 6fae6c2..67eab9d 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -23,26 +23,11 @@ bin=`cd "$bin"; pwd`
 # get Flink config
 . "$bin"/config.sh
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink AppMaster
-constructAppMasterClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
 if [ "$FLINK_IDENT_STRING" = "" ]; then
     FLINK_IDENT_STRING="$USER"
 fi
 
-CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+CC_CLASSPATH=`manglePathList 
$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 
log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
 log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh 
b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
index 23a301b..ab2f7b1 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -23,22 +23,7 @@ bin=`cd "$bin"; pwd`
 # get Flink config
 . "$bin"/config.sh
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink TaskManager
-constructTaskManagerClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+CC_CLASSPATH=`manglePathList 
$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log=flink-taskmanager.log
 log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh 
b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 1755f32..03b1e3a 100755
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -29,22 +29,7 @@ fi
 
 JVM_ARGS="$JVM_ARGS -Xmx512m"
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink CLI client
-constructCLIClientClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
+CC_CLASSPATH=`manglePathList 
$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
 log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 52d5402..9306090 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,9 +22,9 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.hadoop.conf.Configuration;
@@ -640,12 +640,11 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                final Map<String, LocalResource> localResources = new 
HashMap<>(2 + effectiveShipFiles.size());
                // list of remote paths (after upload)
                final List<Path> paths = new ArrayList<>(2 + 
effectiveShipFiles.size());
-               // classpath assembler
-               final StringBuilder classPathBuilder = new StringBuilder();
                // ship list that enables reuse of resources for task manager 
containers
                StringBuilder envShipFileList = new StringBuilder();
 
                // upload and register ship files
+               final List<String> classPaths = new ArrayList<>();
                for (File shipFile : effectiveShipFiles) {
                        LocalResource shipResources = 
Records.newRecord(LocalResource.class);
 
@@ -664,29 +663,32 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
 
                                Files.walkFileTree(shipPath, new 
SimpleFileVisitor<java.nio.file.Path>() {
                                        @Override
-                                       public FileVisitResult 
preVisitDirectory(java.nio.file.Path dir, BasicFileAttributes attrs)
+                                       public FileVisitResult 
visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
                                                        throws IOException {
-                                               super.preVisitDirectory(dir, 
attrs);
-
-                                               java.nio.file.Path relativePath 
= parentPath.relativize(dir);
+                                               java.nio.file.Path relativePath 
= parentPath.relativize(file);
 
-                                               classPathBuilder
-                                                       .append(relativePath)
-                                                       .append(File.separator)
-                                                       .append("*")
-                                                       
.append(File.pathSeparator);
+                                               
classPaths.add(relativePath.toString());
 
                                                return FileVisitResult.CONTINUE;
                                        }
                                });
                        } else {
                                // add files to the classpath
-                               
classPathBuilder.append(shipFile.getName()).append(File.pathSeparator);
+                               classPaths.add(shipFile.getName());
                        }
 
                        envShipFileList.append(remotePath).append(",");
                }
 
+               // normalize classpath by sorting
+               Collections.sort(classPaths);
+
+               // classpath assembler
+               StringBuilder classPathBuilder = new StringBuilder();
+               for (String classPath : classPaths) {
+                       
classPathBuilder.append(classPath).append(File.pathSeparator);
+               }
+
                // Setup jar for ApplicationMaster
                LocalResource appMasterJar = 
Records.newRecord(LocalResource.class);
                LocalResource flinkConf = 
Records.newRecord(LocalResource.class);

Reply via email to