Repository: hive Updated Branches: refs/heads/spark fd1192914 -> b02cd4abc
HIVE-12229: Custom script in query cannot be executed in yarn-cluster mode [Spark Branch] (Rui reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b02cd4ab Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b02cd4ab Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b02cd4ab Branch: refs/heads/spark Commit: b02cd4abce10003dc90646b710875fba00b9b5b0 Parents: fd11929 Author: Rui Li <rui...@intel.com> Authored: Thu Nov 5 16:48:25 2015 +0800 Committer: Rui Li <rui...@intel.com> Committed: Thu Nov 5 16:51:22 2015 +0800 ---------------------------------------------------------------------- .../hadoop/hive/ql/exec/ScriptOperator.java | 15 ++++++++++++ .../ql/exec/spark/RemoteHiveSparkClient.java | 4 ++-- .../hive/ql/exec/spark/SparkUtilities.java | 10 ++++---- .../apache/hive/spark/client/JobContext.java | 4 ++-- .../hive/spark/client/JobContextImpl.java | 8 +++---- .../hive/spark/client/SparkClientImpl.java | 2 +- .../hive/spark/client/SparkClientUtilities.java | 24 +++++++++++++------- 7 files changed, 43 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 5df9ea2..63837ce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; import org.apache.spark.SparkFiles; import java.io.BufferedInputStream; @@ -329,6 +331,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements // initialize the user's process only when you receive the first row if (firstRow) { firstRow = false; + SparkConf sparkConf = null; try { String[] cmdArgs = splitArgs(conf.getScriptCmd()); @@ -341,6 +344,7 @@ public class ScriptOperator extends Operator<ScriptDesc> implements // In spark local mode, we need to search added files in root directory. if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + sparkConf = SparkEnv.get().conf(); finder.prependPathComponent(SparkFiles.getRootDirectory()); } File f = finder.getAbsolutePath(prog); @@ -371,6 +375,17 @@ public class ScriptOperator extends Operator<ScriptDesc> implements String idEnvVarVal = getOperatorId(); env.put(safeEnvVarName(idEnvVarName), idEnvVarVal); + // For spark, in non-local mode, any added dependencies are stored at + // SparkFiles::getRootDirectory, which is the executor's working directory. + // In local mode, we need to manually point the process's working directory to it, + // in order to make the dependencies accessible. + if (sparkConf != null) { + String master = sparkConf.get("spark.master"); + if (master.equals("local") || master.startsWith("local[")) { + pb.directory(new File(SparkFiles.getRootDirectory())); + } + } + scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs); DataOutputStream scriptOut = new DataOutputStream( http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 2e8d1d3..cf81424 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -295,11 +295,11 @@ public class RemoteHiveSparkClient implements HiveSparkClient { // Add jar to current thread class loader dynamically, and add jar paths to JobConf as Spark // may need to load classes from this jar in other threads. - Set<String> addedJars = jc.getAddedJars(); + Map<String, Long> addedJars = jc.getAddedJars(); if (addedJars != null && !addedJars.isEmpty()) { SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir()); KryoSerializer.setClassLoader(Thread.currentThread().getContextClassLoader()); - localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars, ";")); + localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";")); } Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class); http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index cf2c3bc..0268469 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; -import java.util.UUID; -import java.util.Collection; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; @@ -91,11 +89,11 @@ public class SparkUtilities { */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { Path localFile = new Path(source.getPath()); - // give the uploaded file a UUID - Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), - UUID.randomUUID() + "-" + getFileName(source)); + Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source)); FileSystem fileSystem = FileSystem.get(conf); - fileSystem.copyFromLocalFile(localFile, remoteFile); + // Overwrite if the remote file already exists. Whether the file can be added + // on executor is up to spark, i.e. spark.files.overwrite + fileSystem.copyFromLocalFile(false, true, localFile, remoteFile); Path fullPath = fileSystem.getFileStatus(remoteFile).getPath(); return fullPath.toUri(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index af6332e..c9c975b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -55,9 +55,9 @@ public interface JobContext { Map<String, List<JavaFutureAction<?>>> getMonitoredJobs(); /** - * Return all added jar path which added through AddJarJob. + * Return all added jar path and timestamp which added through AddJarJob. */ - Set<String> getAddedJars(); + Map<String, Long> getAddedJars(); /** * Returns a local tmp dir specific to the context http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index beed8a3..b73bcd7 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -18,12 +18,10 @@ package org.apache.hive.spark.client; import java.io.File; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hive.spark.counter.SparkCounters; @@ -35,14 +33,14 @@ class JobContextImpl implements JobContext { private final JavaSparkContext sc; private final ThreadLocal<MonitorCallback> monitorCb; private final Map<String, List<JavaFutureAction<?>>> monitoredJobs; - private final Set<String> addedJars; + private final Map<String, Long> addedJars; private final File localTmpDir; public JobContextImpl(JavaSparkContext sc, File localTmpDir) { this.sc = sc; this.monitorCb = new ThreadLocal<MonitorCallback>(); monitoredJobs = new ConcurrentHashMap<String, List<JavaFutureAction<?>>>(); - addedJars = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); + addedJars = new ConcurrentHashMap<>(); this.localTmpDir = localTmpDir; } @@ -65,7 +63,7 @@ class JobContextImpl implements JobContext { } @Override - public Set<String> getAddedJars() { + public Map<String, Long> getAddedJars() { return addedJars; } http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index ceebbb3..3d682a0 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -617,7 +617,7 @@ class SparkClientImpl implements SparkClient { jc.sc().addJar(path); // Following remote job may refer to classes in this jar, and the remote job would be executed // in a different thread, so we add this jar path to JobContext for further usage. - jc.getAddedJars().add(path); + jc.getAddedJars().put(path, System.currentTimeMillis()); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/b02cd4ab/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java index 589436d..bbbd97b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java @@ -24,7 +24,8 @@ import java.io.File; import java.net.URL; import java.net.URLClassLoader; import java.util.List; -import java.util.Set; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -35,20 +36,21 @@ import org.apache.hadoop.fs.Path; public class SparkClientUtilities { protected static final transient Log LOG = LogFactory.getLog(SparkClientUtilities.class); + private static final Map<String, Long> downloadedFiles = new ConcurrentHashMap<>(); /** * Add new elements to the classpath. * - * @param newPaths Set of classpath elements + * @param newPaths Map of classpath elements and corresponding timestamp */ - public static void addToClassPath(Set<String> newPaths, Configuration conf, File localTmpDir) + public static void addToClassPath(Map<String, Long> newPaths, Configuration conf, File localTmpDir) throws Exception { URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader(); List<URL> curPath = Lists.newArrayList(loader.getURLs()); boolean newPathAdded = false; - for (String newPath : newPaths) { - URL newUrl = urlFromPathString(newPath, conf, localTmpDir); + for (Map.Entry<String, Long> entry : newPaths.entrySet()) { + URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir); if (newUrl != null && !curPath.contains(newUrl)) { curPath.add(newUrl); LOG.info("Added jar[" + newUrl + "] to classpath."); @@ -69,7 +71,8 @@ public class SparkClientUtilities { * @param path path string * @return */ - private static URL urlFromPathString(String path, Configuration conf, File localTmpDir) { + private static URL urlFromPathString(String path, Long timeStamp, + Configuration conf, File localTmpDir) { URL url = null; try { if (StringUtils.indexOf(path, "file:/") == 0) { @@ -78,12 +81,17 @@ public class SparkClientUtilities { Path remoteFile = new Path(path); Path localFile = new Path(localTmpDir.getAbsolutePath() + File.separator + remoteFile.getName()); - if (!new File(localFile.toString()).exists()) { + Long currentTS = downloadedFiles.get(path); + if (currentTS == null) { + currentTS = -1L; + } + if (!new File(localFile.toString()).exists() || currentTS < timeStamp) { LOG.info("Copying " + remoteFile + " to " + localFile); FileSystem remoteFS = remoteFile.getFileSystem(conf); remoteFS.copyToLocalFile(remoteFile, localFile); + downloadedFiles.put(path, timeStamp); } - return urlFromPathString(localFile.toString(), conf, localTmpDir); + return urlFromPathString(localFile.toString(), timeStamp, conf, localTmpDir); } else { url = new File(path).toURL(); }