HIVE-10671: yarn-cluster mode offers a degraded performance from yarn-client [Spark Branch] (Rui via Xuefu, reviewed by Chengxiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f27c6ed Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f27c6ed Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f27c6ed Branch: refs/heads/hbase-metastore Commit: 3f27c6ed47bbfd8257c70bca0983a7f2e3e74f4a Parents: 2df14f9 Author: xzhang <xzhang@xzdt> Authored: Thu May 14 14:20:11 2015 -0700 Committer: Xuefu Zhang <xzh...@cloudera.com> Committed: Mon Jun 1 14:03:54 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/exec/spark/RemoteHiveSparkClient.java | 10 +++++----- .../hadoop/hive/ql/exec/spark/SparkUtilities.java | 11 +++++++---- .../java/org/apache/hive/spark/client/JobContext.java | 2 +- .../org/apache/hive/spark/client/JobContextImpl.java | 7 ++++--- .../apache/hive/spark/client/SparkClientUtilities.java | 13 ++++++++----- 5 files changed, 25 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/3f27c6ed/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 059016d..bae30f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -28,6 +28,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -167,10 +168,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient { try { URI fileUri = SparkUtilities.getURI(addedFile); if (fileUri != null && !localFiles.contains(fileUri)) { + localFiles.add(fileUri); if (SparkUtilities.needUploadToHDFS(fileUri, sparkConf)) { fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf); } - localFiles.add(fileUri); remoteClient.addFile(fileUri); } } catch (URISyntaxException e) { @@ -184,10 +185,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient { try { URI jarUri = SparkUtilities.getURI(addedJar); if (jarUri != null && !localJars.contains(jarUri)) { + localJars.add(jarUri); if (SparkUtilities.needUploadToHDFS(jarUri, sparkConf)) { jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf); } - localJars.add(jarUri); remoteClient.addJar(jarUri); } } catch (URISyntaxException e) { @@ -224,10 +225,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient { // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark // may need to load classes from this jar in other threads. - List<String> addedJars = jc.getAddedJars(); + Set<String> addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { - SparkClientUtilities.addToClassPath(addedJars.toArray(new String[addedJars.size()]), - localJobConf, jc.getLocalTmpDir()); + SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); } http://git-wip-us.apache.org/repos/asf/hive/blob/3f27c6ed/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 72ab913..c012af8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.UUID; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.fs.FileSystem; @@ -76,11 +77,13 @@ public class SparkUtilities { * @throws IOException */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { - Path tmpDir = SessionState.getHDFSSessionPath(conf); + Path localFile = new Path(source.getPath()); + // give the uploaded file a UUID + Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), + UUID.randomUUID() + "-" + getFileName(source)); FileSystem fileSystem = FileSystem.get(conf); - fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir); - String filePath = tmpDir + File.separator + getFileName(source); - Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath(); + fileSystem.copyFromLocalFile(localFile, remoteFile); + Path fullPath = fileSystem.getFileStatus(remoteFile).getPath(); return fullPath.toUri(); } http://git-wip-us.apache.org/repos/asf/hive/blob/3f27c6ed/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 36e252c..af6332e 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -57,7 +57,7 @@ public interface JobContext { /** * Return all added jar path which added through AddJarJob. */ - List<String> getAddedJars(); + Set<String> getAddedJars(); /** * Returns a local tmp dir specific to the context http://git-wip-us.apache.org/repos/asf/hive/blob/3f27c6ed/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index 164d90a..beed8a3 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.File; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,14 +35,14 @@ class JobContextImpl implements JobContext { private final JavaSparkContext sc; private final ThreadLocal<MonitorCallback> monitorCb; private final Map<String, List<JavaFutureAction<?>>> monitoredJobs; - private final List<String> addedJars; + private final Set<String> addedJars; private final File localTmpDir; public JobContextImpl(JavaSparkContext sc, File localTmpDir) { this.sc = sc; this.monitorCb = new ThreadLocal<MonitorCallback>(); monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>(); - addedJars = new CopyOnWriteArrayList<String>(); + addedJars = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); this.localTmpDir = localTmpDir; } @@ -64,7 +65,7 @@ class JobContextImpl implements JobContext { } @Override - public List<String> getAddedJars() { + public Set<String> getAddedJars() { return addedJars; } http://git-wip-us.apache.org/repos/asf/hive/blob/3f27c6ed/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index 879f8a4..b079ee2 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -24,6 +24,7 @@ import java.io.File; import java.net.URL; import java.net.URLClassLoader; import java.util.List; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -38,9 +39,9 @@ public class SparkClientUtilities { /** * Add new elements to the classpath. * - * @param newPaths Array of classpath elements + * @param newPaths Set of classpath elements */ - public static void addToClassPath(String[] newPaths, Configuration conf, File localTmpDir) + public static void addToClassPath(Set<String> newPaths, Configuration conf, File localTmpDir) throws Exception { ClassLoader cloader = Thread.currentThread().getContextClassLoader(); URLClassLoader loader = (URLClassLoader) cloader; @@ -74,9 +75,11 @@ public class SparkClientUtilities { Path remoteFile = new Path(path); Path localFile = new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName()); - LOG.info("Copying " + remoteFile + " to " + localFile); - FileSystem fs = remoteFile.getFileSystem(conf); - fs.copyToLocalFile(remoteFile, localFile); + if (!new File(localFile.toString()).exists()) { + LOG.info("Copying " + remoteFile + " to " + localFile); + FileSystem remoteFS = remoteFile.getFileSystem(conf); + remoteFS.copyToLocalFile(remoteFile, localFile); + } return urlFromPathString(localFile.toString(), conf, localTmpDir); } else { url = new File(path).toURL();