Repository: hive Updated Branches: refs/heads/spark 13eb40954 -> 84363196b
HIVE-11473: Upgrade Spark dependency to 1.5 [Spark Branch] (Rui reviewed by Xuefu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/84363196 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/84363196 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/84363196 Branch: refs/heads/spark Commit: 84363196bda0f6f2c03884cf633913c79dec30a3 Parents: 13eb409 Author: Rui Li <rui...@intel.com> Authored: Thu Oct 22 13:58:46 2015 +0800 Committer: Rui Li <rui...@intel.com> Committed: Thu Oct 22 13:59:30 2015 +0800 ---------------------------------------------------------------------- pom.xml | 12 +-- .../spark/status/impl/JobMetricsListener.java | 88 +------------------- .../apache/hive/spark/client/RemoteDriver.java | 60 +------------ 3 files changed, 5 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/84363196/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b55e86a..0cd4238 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,7 @@ <ST4.version>4.0.4</ST4.version> <tez.version>0.5.2</tez.version> <super-csv.version>2.2.0</super-csv.version> - <spark.version>1.4.0</spark.version> + <spark.version>1.5.0</spark.version> <scala.binary.version>2.10</scala.binary.version> <scala.version>2.10.4</scala.version> <tempus-fugit.version>1.1</tempus-fugit.version> @@ -222,16 +222,6 @@ <enabled>false</enabled> </snapshots> </repository> - <repository> - <id>spark-1.3</id> - <url>https://s3-us-west-1.amazonaws.com/hive-spark/maven2/spark_2.10-1.3-rc1/</url> - <releases> - <enabled>true</enabled> - </releases> - <snapshots> - <enabled>false</enabled> - </snapshots> - </repository> </repositories> <!-- Hadoop dependency management is done at the bottom under profiles --> http://git-wip-us.apache.org/repos/asf/hive/blob/84363196/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index 51772cd..52f4b9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -23,29 +23,15 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.spark.JavaSparkListener; import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -public class JobMetricsListener implements SparkListener { +public class JobMetricsListener extends JavaSparkListener { private static final Log LOG = LogFactory.getLog(JobMetricsListener.class); @@ -54,36 +40,6 @@ public class JobMetricsListener implements SparkListener { private final Map<Integer, Map<String, List<TaskMetrics>>> allJobMetrics = Maps.newHashMap(); @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved removed) { - - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded added) { - - } - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - - } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - - } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { - - } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - - } - - @Override public synchronized void onTaskEnd(SparkListenerTaskEnd taskEnd) { int stageId = taskEnd.stageId(); int stageAttemptId = taskEnd.stageAttemptId(); @@ -119,46 +75,6 @@ public class JobMetricsListener implements SparkListener { jobIdToStageId.put(jobId, intStageIds); } - @Override - public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { - - } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - - } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - - } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - - } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - - } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { - - } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - - } - - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - - } - public synchronized Map<String, List<TaskMetrics>> getJobMetric(int jobId) { return allJobMetrics.get(jobId); } http://git-wip-us.apache.org/repos/asf/hive/blob/84363196/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index b77c9e8..f5b1e48 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -43,26 +43,13 @@ import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.counter.SparkCounters; +import org.apache.spark.JavaSparkListener; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -438,21 +425,11 @@ public class RemoteDriver { } - private class ClientListener implements SparkListener { + private class ClientListener extends JavaSparkListener { private final Map<Integer, Integer> stageToJobId = Maps.newHashMap(); @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved removed) { - - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded added) { - - } - - @Override public void onJobStart(SparkListenerJobStart jobStart) { synchronized (stageToJobId) { for (int i = 0; i < jobStart.stageIds().length(); i++) { @@ -500,39 +477,6 @@ public class RemoteDriver { } } - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { } - - @Override - public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { } - /** * Returns the client job ID for the given Spark job ID. *