HIVE-14029: Update Spark version to 2.0.0 (Ferdinand Xu, via Li Rui, Szehon Ho and Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ac977cc8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ac977cc8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ac977cc8 Branch: refs/heads/repl2 Commit: ac977cc88757b49fbbd5c3bb236adcedcaae396c Parents: 7d3da17 Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Wed Sep 28 01:44:32 2016 +0800 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Wed Sep 28 01:44:32 2016 +0800 ---------------------------------------------------------------------- pom.xml | 12 ++- ql/pom.xml | 26 +++++- .../exec/spark/HiveBaseFunctionResultList.java | 96 +++++++++----------- .../hive/ql/exec/spark/HiveMapFunction.java | 2 +- .../hive/ql/exec/spark/HiveReduceFunction.java | 2 +- .../hive/ql/exec/spark/SortByShuffler.java | 84 ++++++++--------- .../spark/status/impl/JobMetricsListener.java | 4 +- .../ql/exec/spark/TestHiveKVResultCache.java | 5 +- spark-client/pom.xml | 15 ++- .../hive/spark/client/MetricsCollection.java | 8 +- .../apache/hive/spark/client/RemoteDriver.java | 4 +- .../hive/spark/client/metrics/InputMetrics.java | 9 +- .../hive/spark/client/metrics/Metrics.java | 6 +- .../client/metrics/ShuffleReadMetrics.java | 18 ++-- .../client/metrics/ShuffleWriteMetrics.java | 4 +- .../spark/client/TestMetricsCollection.java | 8 +- 16 files changed, 153 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2fb78cd..756cc34 100644 --- a/pom.xml +++ b/pom.xml @@ -145,7 +145,7 @@ <ivy.version>2.4.0</ivy.version> <jackson.version>1.9.13</jackson.version> <!-- jackson 1 and 2 lines can coexist without issue, as they have different artifactIds --> - <jackson.new.version>2.4.2</jackson.new.version> + <jackson.new.version>2.6.5</jackson.new.version> <jasper.version>5.5.23</jasper.version> <jamon.plugin.version>2.3.4</jamon.plugin.version> <jamon-runtime.version>2.3.1</jamon-runtime.version> @@ -155,6 +155,8 @@ <jdo-api.version>3.0.1</jdo-api.version> <jetty.version>7.6.0.v20120127</jetty.version> <jersey.version>1.14</jersey.version> + <!-- Glassfish jersey is included for Spark client test only --> + <glassfish.jersey.version>2.22.2</glassfish.jersey.version> <jline.version>2.12</jline.version> <jms.version>1.1</jms.version> <joda.version>2.8.1</joda.version> @@ -168,7 +170,7 @@ <opencsv.version>2.3</opencsv.version> <mockito-all.version>1.9.5</mockito-all.version> <mina.version>2.0.0-M5</mina.version> - <netty.version>4.0.23.Final</netty.version> + <netty.version>4.0.29.Final</netty.version> <parquet.version>1.8.1</parquet.version> <pig.version>0.16.0</pig.version> <protobuf.version>2.5.0</protobuf.version> @@ -178,9 +180,9 @@ <tez.version>0.8.4</tez.version> <slider.version>0.90.2-incubating</slider.version> <super-csv.version>2.2.0</super-csv.version> - <spark.version>1.6.0</spark.version> - <scala.binary.version>2.10</scala.binary.version> - <scala.version>2.10.4</scala.version> + <spark.version>2.0.0</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> <tempus-fugit.version>1.1</tempus-fugit.version> <snappy.version>0.2</snappy.version> <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version> http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index 02ddb80..2a93bb7 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -361,7 +361,7 @@ <version>${calcite.version}</version> <exclusions> <!-- hsqldb interferes with the use of derby as the default db - in hive's use of datanucleus. + in hive's use of datanucleus. --> <exclusion> <groupId>org.hsqldb</groupId> @@ -380,14 +380,14 @@ <artifactId>jackson-core</artifactId> </exclusion> </exclusions> - </dependency> + </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-avatica</artifactId> <version>${calcite.version}</version> <exclusions> <!-- hsqldb interferes with the use of derby as the default db - in hive's use of datanucleus. + in hive's use of datanucleus. --> <exclusion> <groupId>org.hsqldb</groupId> @@ -685,6 +685,14 @@ <groupId>commmons-logging</groupId> <artifactId>commons-logging</artifactId> </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>*</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -692,6 +700,18 @@ <artifactId>jersey-servlet</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-server</artifactId> + <version>${glassfish.jersey.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet-core</artifactId> + <version>${glassfish.jersey.version}</version> + <scope>test</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java index 5b65036..0fc79f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java @@ -38,15 +38,14 @@ import com.google.common.base.Preconditions; * through Iterator interface. */ @SuppressWarnings("rawtypes") -public abstract class HiveBaseFunctionResultList<T> implements - Iterable, OutputCollector<HiveKey, BytesWritable>, Serializable { +public abstract class HiveBaseFunctionResultList<T> + implements Iterator, OutputCollector<HiveKey, BytesWritable>, Serializable { private static final long serialVersionUID = -1L; private final Iterator<T> inputIterator; private boolean isClosed = false; // Contains results from last processed input record. private final HiveKVResultCache lastRecordOutput; - private boolean iteratorAlreadyCreated = false; public HiveBaseFunctionResultList(Iterator<T> inputIterator) { this.inputIterator = inputIterator; @@ -54,13 +53,6 @@ public abstract class HiveBaseFunctionResultList<T> implements } @Override - public Iterator iterator() { - Preconditions.checkState(!iteratorAlreadyCreated, "Iterator can only be created once."); - iteratorAlreadyCreated = true; - return new ResultIterator(); - } - - @Override public void collect(HiveKey key, BytesWritable value) throws IOException { lastRecordOutput.add(SparkUtilities.copyHiveKey(key), SparkUtilities.copyBytesWritable(value)); @@ -77,57 +69,55 @@ public abstract class HiveBaseFunctionResultList<T> implements /** Close the record processor. */ protected abstract void closeRecordProcessor(); - /** Implement Iterator interface. */ - public class ResultIterator implements Iterator { - @Override - public boolean hasNext(){ - // Return remaining records (if any) from last processed input record. - if (lastRecordOutput.hasNext()) { - return true; - } + @Override + public boolean hasNext() { + // Return remaining records (if any) from last processed input record. + if (lastRecordOutput.hasNext()) { + return true; + } - // Process the records in the input iterator until - // - new output records are available for serving downstream operator, - // - input records are exhausted or - // - processing is completed. - while (inputIterator.hasNext() && !processingDone()) { - try { - processNextRecord(inputIterator.next()); - if (lastRecordOutput.hasNext()) { - return true; - } - } catch (IOException ex) { - throw new IllegalStateException("Error while processing input.", ex); + // Process the records in the input iterator until + // - new output records are available for serving downstream operator, + // - input records are exhausted or + // - processing is completed. + while (inputIterator.hasNext() && !processingDone()) { + try { + processNextRecord(inputIterator.next()); + if (lastRecordOutput.hasNext()) { + return true; } + } catch (IOException ex) { + throw new IllegalStateException("Error while processing input.", ex); } + } - // At this point we are done processing the input. Close the record processor - if (!isClosed) { - closeRecordProcessor(); - isClosed = true; - } - - // It is possible that some operators add records after closing the processor, so make sure - // to check the lastRecordOutput - if (lastRecordOutput.hasNext()) { - return true; - } - - lastRecordOutput.clear(); - return false; + // At this point we are done processing the input. Close the record processor + if (!isClosed) { + closeRecordProcessor(); + isClosed = true; } - @Override - public Tuple2<HiveKey, BytesWritable> next() { - if (hasNext()) { - return lastRecordOutput.next(); - } - throw new NoSuchElementException("There are no more elements"); + // It is possible that some operators add records after closing the processor, so make sure + // to check the lastRecordOutput + if (lastRecordOutput.hasNext()) { + return true; } - @Override - public void remove() { - throw new UnsupportedOperationException("Iterator.remove() is not supported"); + lastRecordOutput.clear(); + return false; + } + + @Override + public Tuple2<HiveKey, BytesWritable> next() { + if (hasNext()) { + return lastRecordOutput.next(); } + throw new NoSuchElementException("There are no more elements"); } + + @Override + public void remove() { + throw new UnsupportedOperationException("Iterator.remove() is not supported"); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 53c5c0e..ff21a52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -38,7 +38,7 @@ public class HiveMapFunction extends HivePairFlatMapFunction< @SuppressWarnings("unchecked") @Override - public Iterable<Tuple2<HiveKey, BytesWritable>> + public Iterator<Tuple2<HiveKey, BytesWritable>> call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception { initJobConf(); http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java index f6595f1..eeb4443 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java @@ -36,7 +36,7 @@ public class HiveReduceFunction extends HivePairFlatMapFunction< @SuppressWarnings("unchecked") @Override - public Iterable<Tuple2<HiveKey, BytesWritable>> + public Iterator<Tuple2<HiveKey, BytesWritable>> call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception { initJobConf(); http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java index a6350d3..997ab7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java @@ -75,60 +75,52 @@ public class SortByShuffler implements SparkShuffler { private static final long serialVersionUID = 1L; @Override - public Iterable<Tuple2<HiveKey, Iterable<BytesWritable>>> call( - final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception { + public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> call( + final Iterator<Tuple2<HiveKey, BytesWritable>> it) throws Exception { // Use input iterator to back returned iterable object. - final Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> resultIt = - new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() { - HiveKey curKey = null; - List<BytesWritable> curValues = new ArrayList<BytesWritable>(); + return new Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>>() { + HiveKey curKey = null; + List<BytesWritable> curValues = new ArrayList<BytesWritable>(); - @Override - public boolean hasNext() { - return it.hasNext() || curKey != null; - } + @Override + public boolean hasNext() { + return it.hasNext() || curKey != null; + } - @Override - public Tuple2<HiveKey, Iterable<BytesWritable>> next() { - // TODO: implement this by accumulating rows with the same key into a list. - // Note that this list needs to improved to prevent excessive memory usage, but this - // can be done in later phase. - while (it.hasNext()) { - Tuple2<HiveKey, BytesWritable> pair = it.next(); - if (curKey != null && !curKey.equals(pair._1())) { - HiveKey key = curKey; - List<BytesWritable> values = curValues; - curKey = pair._1(); - curValues = new ArrayList<BytesWritable>(); - curValues.add(pair._2()); - return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values); - } - curKey = pair._1(); - curValues.add(pair._2()); - } - if (curKey == null) { - throw new NoSuchElementException(); - } - // if we get here, this should be the last element we have + @Override + public Tuple2<HiveKey, Iterable<BytesWritable>> next() { + // TODO: implement this by accumulating rows with the same key into a list. + // Note that this list needs to improved to prevent excessive memory usage, but this + // can be done in later phase. + while (it.hasNext()) { + Tuple2<HiveKey, BytesWritable> pair = it.next(); + if (curKey != null && !curKey.equals(pair._1())) { HiveKey key = curKey; - curKey = null; - return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues); + List<BytesWritable> values = curValues; + curKey = pair._1(); + curValues = new ArrayList<BytesWritable>(); + curValues.add(pair._2()); + return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, values); } + curKey = pair._1(); + curValues.add(pair._2()); + } + if (curKey == null) { + throw new NoSuchElementException(); + } + // if we get here, this should be the last element we have + HiveKey key = curKey; + curKey = null; + return new Tuple2<HiveKey, Iterable<BytesWritable>>(key, curValues); + } - @Override - public void remove() { - // Not implemented. - // throw Unsupported Method Invocation Exception. - throw new UnsupportedOperationException(); - } - - }; - - return new Iterable<Tuple2<HiveKey, Iterable<BytesWritable>>>() { @Override - public Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> iterator() { - return resultIt; + public void remove() { + // Not implemented. + // throw Unsupported Method Invocation Exception. + throw new UnsupportedOperationException(); } + }; } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java index 09c54c1..b48de3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java @@ -24,15 +24,15 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.JavaSparkListener; import org.apache.spark.executor.TaskMetrics; +import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.SparkListenerTaskEnd; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -public class JobMetricsListener extends JavaSparkListener { +public class JobMetricsListener extends SparkListener { private static final Logger LOG = LoggerFactory.getLogger(JobMetricsListener.class); http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java index ee9f9b7..7bb9c62 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveKVResultCache.java @@ -282,9 +282,8 @@ public class TestHiveKVResultCache { resultList.init(rows, threshold, separate, prefix1, prefix2); long startTime = System.currentTimeMillis(); - Iterator it = resultList.iterator(); - while (it.hasNext()) { - Object item = it.next(); + while (resultList.hasNext()) { + Object item = resultList.next(); if (output != null) { output.add((Tuple2<HiveKey, BytesWritable>)item); } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/pom.xml ---------------------------------------------------------------------- diff --git a/spark-client/pom.xml b/spark-client/pom.xml index 6cf3b17..effc13b 100644 --- a/spark-client/pom.xml +++ b/spark-client/pom.xml @@ -33,7 +33,6 @@ <properties> <hive.path.to.root>..</hive.path.to.root> - <scala.binary.version>2.10</scala.binary.version> <test.redirectToFile>true</test.redirectToFile> </properties> @@ -70,6 +69,14 @@ <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>*</artifactId> + </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> @@ -96,6 +103,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.glassfish.jersey.containers</groupId> + <artifactId>jersey-container-servlet</artifactId> + <version>${glassfish.jersey.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service-rpc</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index e77aa78..0f03a64 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -151,7 +151,6 @@ public class MetricsCollection { // Input metrics. boolean hasInputMetrics = false; - DataReadMethod readMethod = null; long bytesRead = 0L; // Shuffle read metrics. @@ -177,11 +176,6 @@ public class MetricsCollection { if (m.inputMetrics != null) { hasInputMetrics = true; - if (readMethod == null) { - readMethod = m.inputMetrics.readMethod; - } else if (readMethod != m.inputMetrics.readMethod) { - readMethod = DataReadMethod.Multiple; - } bytesRead += m.inputMetrics.bytesRead; } @@ -201,7 +195,7 @@ public class MetricsCollection { InputMetrics inputMetrics = null; if (hasInputMetrics) { - inputMetrics = new InputMetrics(readMethod, bytesRead); + inputMetrics = new InputMetrics(bytesRead); } ShuffleReadMetrics shuffleReadMetrics = null; http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index e3b88d1..ede8ce9 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -43,11 +43,11 @@ import org.apache.hive.spark.client.metrics.Metrics; import org.apache.hive.spark.client.rpc.Rpc; import org.apache.hive.spark.client.rpc.RpcConfiguration; import org.apache.hive.spark.counter.SparkCounters; -import org.apache.spark.JavaSparkListener; import org.apache.spark.SparkConf; import org.apache.spark.SparkJobInfo; import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.scheduler.SparkListener; import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.scheduler.SparkListenerTaskEnd; @@ -441,7 +441,7 @@ public class RemoteDriver { } - private class ClientListener extends JavaSparkListener { + private class ClientListener extends SparkListener { private final Map<Integer, Integer> stageToJobId = Maps.newHashMap(); http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java index e46b67d..f137007 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java @@ -28,25 +28,20 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; */ @InterfaceAudience.Private public class InputMetrics implements Serializable { - - public final DataReadMethod readMethod; public final long bytesRead; private InputMetrics() { // For Serialization only. - this(null, 0L); + this(0L); } public InputMetrics( - DataReadMethod readMethod, long bytesRead) { - this.readMethod = readMethod; this.bytesRead = bytesRead; } public InputMetrics(TaskMetrics metrics) { - this(DataReadMethod.valueOf(metrics.inputMetrics().get().readMethod().toString()), - metrics.inputMetrics().get().bytesRead()); + this(metrics.inputMetrics().bytesRead()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index a7305cf..418d534 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -99,15 +99,15 @@ public class Metrics implements Serializable { } private static InputMetrics optionalInputMetric(TaskMetrics metrics) { - return metrics.inputMetrics().isDefined() ? new InputMetrics(metrics) : null; + return (metrics.inputMetrics() != null) ? new InputMetrics(metrics) : null; } private static ShuffleReadMetrics optionalShuffleReadMetric(TaskMetrics metrics) { - return metrics.shuffleReadMetrics().isDefined() ? new ShuffleReadMetrics(metrics) : null; + return (metrics.shuffleReadMetrics() != null) ? new ShuffleReadMetrics(metrics) : null; } private static ShuffleWriteMetrics optionalShuffleWriteMetrics(TaskMetrics metrics) { - return metrics.shuffleWriteMetrics().isDefined() ? new ShuffleWriteMetrics(metrics) : null; + return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null; } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java index be14c06..9ff4d0f 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleReadMetrics.java @@ -30,9 +30,9 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; public class ShuffleReadMetrics implements Serializable { /** Number of remote blocks fetched in shuffles by tasks. */ - public final int remoteBlocksFetched; + public final long remoteBlocksFetched; /** Number of local blocks fetched in shuffles by tasks. */ - public final int localBlocksFetched; + public final long localBlocksFetched; /** * Time tasks spent waiting for remote shuffle blocks. This only includes the * time blocking on shuffle input data. For instance if block B is being @@ -49,8 +49,8 @@ public class ShuffleReadMetrics implements Serializable { } public ShuffleReadMetrics( - int remoteBlocksFetched, - int localBlocksFetched, + long remoteBlocksFetched, + long localBlocksFetched, long fetchWaitTime, long remoteBytesRead) { this.remoteBlocksFetched = remoteBlocksFetched; @@ -60,16 +60,16 @@ public class ShuffleReadMetrics implements Serializable { } public ShuffleReadMetrics(TaskMetrics metrics) { - this(metrics.shuffleReadMetrics().get().remoteBlocksFetched(), - metrics.shuffleReadMetrics().get().localBlocksFetched(), - metrics.shuffleReadMetrics().get().fetchWaitTime(), - metrics.shuffleReadMetrics().get().remoteBytesRead()); + this(metrics.shuffleReadMetrics().remoteBlocksFetched(), + metrics.shuffleReadMetrics().localBlocksFetched(), + metrics.shuffleReadMetrics().fetchWaitTime(), + metrics.shuffleReadMetrics().remoteBytesRead()); } /** * Number of blocks fetched in shuffle by tasks (remote or local). */ - public int getTotalBlocksFetched() { + public long getTotalBlocksFetched() { return remoteBlocksFetched + localBlocksFetched; } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java index 4420e4d..64a4b86 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/ShuffleWriteMetrics.java @@ -47,8 +47,8 @@ public class ShuffleWriteMetrics implements Serializable { } public ShuffleWriteMetrics(TaskMetrics metrics) { - this(metrics.shuffleWriteMetrics().get().shuffleBytesWritten(), - metrics.shuffleWriteMetrics().get().shuffleWriteTime()); + this(metrics.shuffleWriteMetrics().shuffleBytesWritten(), + metrics.shuffleWriteMetrics().shuffleWriteTime()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/ac977cc8/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 5146e91..8fef66b 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -95,22 +95,21 @@ public class TestMetricsCollection { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(DataReadMethod.Memory, value), null, null); + new InputMetrics(value), null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, - new InputMetrics(DataReadMethod.Disk, value), null, null); + new InputMetrics(value), null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); Metrics global = collection.getAllMetrics(); assertNotNull(global.inputMetrics); - assertEquals(DataReadMethod.Multiple, global.inputMetrics.readMethod); } private Metrics makeMetrics(int jobId, int stageId, long taskId) { long value = 1000000 * jobId + 1000 * stageId + taskId; return new Metrics(value, value, value, value, value, value, value, - new InputMetrics(DataReadMethod.Memory, value), + new InputMetrics(value), new ShuffleReadMetrics((int) value, (int) value, value, value), new ShuffleWriteMetrics(value, value)); } @@ -156,7 +155,6 @@ public class TestMetricsCollection { assertEquals(expected, metrics.memoryBytesSpilled); assertEquals(expected, metrics.diskBytesSpilled); - assertEquals(DataReadMethod.Memory, metrics.inputMetrics.readMethod); assertEquals(expected, metrics.inputMetrics.bytesRead); assertEquals(expected, metrics.shuffleReadMetrics.remoteBlocksFetched);