Repository: hive Updated Branches: refs/heads/master a2d22b44d -> 71d6e1652
HIVE-18677: SparkClientImpl usage of SessionState.LogHelper doesn't respect isSilent value (Sahil Takiar, reviewed by Peter Vary) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/71d6e165 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/71d6e165 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/71d6e165 Branch: refs/heads/master Commit: 71d6e16522e1965a2c28d451b0811ce123022d9d Parents: a2d22b4 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Wed Feb 14 16:01:18 2018 -0800 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Wed Feb 14 16:01:18 2018 -0800 ---------------------------------------------------------------------- .../ql/exec/spark/RemoteHiveSparkClient.java | 3 +- .../hive/ql/exec/spark/SparkUtilities.java | 9 ++++++ .../spark/status/RemoteSparkJobMonitor.java | 1 + .../ql/exec/spark/status/SparkJobStatus.java | 2 ++ .../spark/status/impl/LocalSparkJobStatus.java | 16 ++++++++++ .../spark/status/impl/RemoteSparkJobStatus.java | 31 ++++++++++++++++++++ .../apache/hive/spark/client/BaseProtocol.java | 13 -------- .../apache/hive/spark/client/RemoteDriver.java | 13 -------- .../hive/spark/client/SparkClientFactory.java | 4 +-- .../hive/spark/client/SparkClientImpl.java | 10 +------ .../hive/spark/client/TestSparkClient.java | 3 +- 11 files changed, 64 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 93d44dc..c571d1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -101,8 +101,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient { } private void createRemoteClient() throws Exception { - remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId, - SessionState.LogHelper.getInfoStream()); + remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) || http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 943a4ee..913c9d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -304,4 +306,11 @@ public class SparkUtilities { } return false; } + + public static String reverseDNSLookupURL(String url) throws UnknownHostException { + // Run a reverse DNS lookup on the URL + URI uri = URI.create(url); + InetAddress address = InetAddress.getByName(uri.getHost()); + return uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + uri.getPort(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index fc4e4de..adb65a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -93,6 +93,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { if (!running) { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); printAppInfo(); + console.printInfo("Hive on Spark Session Web UI URL: " + sparkJobStatus.getWebUIURL()); // print job stages. console.printInfo("\nQuery Hive on Spark job[" + sparkJobStatus.getJobId() + "] stages: " + Arrays.toString(sparkJobStatus.getStageIds())); http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index c2c8fb4..8474afc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -43,6 +43,8 @@ public interface SparkJobStatus { SparkStatistics getSparkStatistics(); + String getWebUIURL(); + void cleanup(); Throwable getError(); http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java index 3e84175..8b031e7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import java.net.UnknownHostException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; @@ -152,6 +154,20 @@ public class LocalSparkJobStatus implements SparkJobStatus { } @Override + public String getWebUIURL() { + try { + if (sparkContext.sc().uiWebUrl().isDefined()) { + return SparkUtilities.reverseDNSLookupURL(sparkContext.sc().uiWebUrl().get()); + } else { + return "UNDEFINED"; + } + } catch (Exception e) { + LOG.warn("Failed to get web UI URL.", e); + } + return "UNKNOWN"; + } + + @Override public void cleanup() { jobMetricsListener.cleanup(jobId); if (cachedRDDIds != null) { http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index ce07a9f..e950452 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -38,6 +39,8 @@ import org.apache.spark.SparkStageInfo; import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; +import java.net.InetAddress; +import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,6 +141,20 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } @Override + public String getWebUIURL() { + Future<String> getWebUIURL = sparkClient.run(new GetWebUIURLJob()); + try { + return getWebUIURL.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.warn("Failed to get web UI URL.", e); + if (Thread.interrupted()) { + error = e; + } + return "UNKNOWN"; + } + } + + @Override public void cleanup() { } @@ -287,4 +304,18 @@ public class RemoteSparkJobStatus implements SparkJobStatus { return jc.sc().sc().applicationId(); } } + + private static class GetWebUIURLJob implements Job<String> { + + public GetWebUIURLJob() { + } + + @Override + public String call(JobContext jc) throws Exception { + if (jc.sc().sc().uiWebUrl().isDefined()) { + return SparkUtilities.reverseDNSLookupURL(jc.sc().sc().uiWebUrl().get()); + } + return "UNDEFINED"; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 7290809..7ca89ed 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -165,17 +165,4 @@ abstract class BaseProtocol extends RpcDispatcher { } } - - protected static class SparkUIWebURL<T extends Serializable> implements Serializable { - - final String UIWebURL; - - SparkUIWebURL(String UIWebURL) { - this.UIWebURL = UIWebURL; - } - - SparkUIWebURL() { - this(null); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 66cf339..e584cbb 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -171,14 +171,6 @@ public class RemoteDriver { throw e; } - if (jc.sc().sc().uiWebUrl().isDefined()) { - // Run a reverse DNS lookup on the URL - URI uri = URI.create(jc.sc().sc().uiWebUrl().get()); - InetAddress address = InetAddress.getByName(uri.getHost()); - this.protocol.sendUIWebURL(uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + - uri.getPort()); - } - synchronized (jcLock) { for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) { it.next().submit(); @@ -280,11 +272,6 @@ public class RemoteDriver { clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); } - void sendUIWebURL(String UIWebURL) { - LOG.debug("Send UIWebURL({}) to Client.", UIWebURL); - clientRpc.call(new SparkUIWebURL(UIWebURL)); - } - private void handle(ChannelHandlerContext ctx, CancelJob msg) { JobWrapper<?> job = activeJobs.get(msg.id); if (job == null || !cancelJob(job)) { http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 6dfc1a5..fd9b725 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -82,10 +82,10 @@ public final class SparkClientFactory { * @param hiveConf Configuration for Hive, contains hive.* properties. */ public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf, - String sessionId, PrintStream consoleStream) + String sessionId) throws IOException, SparkException { Preconditions.checkState(server != null, "initialize() not called."); - return new SparkClientImpl(server, sparkConf, hiveConf, sessionId, consoleStream); + return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); } } http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 78317ed..214f74b 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -89,15 +89,13 @@ class SparkClientImpl implements SparkClient { private final Map<String, JobHandleImpl<?>> jobs; private final Rpc driverRpc; private final ClientProtocol protocol; - private final PrintStream consoleStream; private volatile boolean isAlive; SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, - String sessionid, PrintStream consoleStream) throws IOException { + String sessionid) throws IOException { this.conf = conf; this.hiveConf = hiveConf; this.jobs = Maps.newConcurrentMap(); - this.consoleStream = consoleStream; String secret = rpcServer.createSecret(); this.driverThread = startDriver(rpcServer, sessionid, secret); @@ -623,12 +621,6 @@ class SparkClientImpl implements SparkClient { LOG.warn("Received spark job ID: {} for unknown job {}", msg.sparkJobId, msg.clientJobId); } } - - private void handle(ChannelHandlerContext ctx, SparkUIWebURL msg) { - String printMsg = "Hive on Spark Session Web UI URL: " + msg.UIWebURL; - consoleStream.println(printMsg); - LOG.info(printMsg); - } } private static class AddJarJob implements Job<Serializable> { http://git-wip-us.apache.org/repos/asf/hive/blob/71d6e165/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index e66caee..fb31c93 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -303,8 +303,7 @@ public class TestSparkClient { SparkClient client = null; try { test.config(conf); - client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString(), - mock(PrintStream.class)); + client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString()); test.call(client); } finally { if (client != null) {