Repository: hive Updated Branches: refs/heads/master 473dd0462 -> f04eba3ca
HIVE-18690: Integrate with Spark OutputMetrics (Sahil Takiar, reviewed by Aihua Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f04eba3c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f04eba3c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f04eba3c Branch: refs/heads/master Commit: f04eba3cac3007bfe61ec2f57c92d404f4d40b4c Parents: 473dd04 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Tue Feb 27 20:21:55 2018 -0800 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Thu Jun 7 14:29:59 2018 -0500 ---------------------------------------------------------------------- .../hive/ql/exec/spark/TestSparkStatistics.java | 2 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 16 ++++- .../hive/ql/exec/spark/SparkMetricUtils.java | 63 ++++++++++++++++++++ .../spark/Statistic/SparkStatisticsNames.java | 3 + .../spark/status/impl/SparkMetricsUtils.java | 4 ++ .../hive/spark/client/MetricsCollection.java | 20 ++++++- .../hive/spark/client/metrics/Metrics.java | 16 ++++- .../spark/client/metrics/OutputMetrics.java | 57 ++++++++++++++++++ .../spark/client/TestMetricsCollection.java | 14 +++-- 9 files changed, 183 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java index f6c5b17..d383873 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkStatistics.java @@ -81,7 +81,7 @@ public class TestSparkStatistics { List<SparkStatistic> sparkStats = Lists.newArrayList(sparkTask.getSparkStatistics() .getStatisticGroup(SparkStatisticsNames.SPARK_GROUP_NAME).getStatistics()); - Assert.assertEquals(24, sparkStats.size()); + Assert.assertEquals(26, sparkStats.size()); Map<String, String> statsMap = sparkStats.stream().collect( Collectors.toMap(SparkStatistic::getName, SparkStatistic::getValue)); http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 83b53f4..c2319bb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.function.BiFunction; import com.google.common.collect.Lists; @@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities.MissingBucketsContext; +import org.apache.hadoop.hive.ql.exec.spark.SparkMetricUtils; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -79,16 +82,18 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.hive.shims.ShimLoader; + import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.ReflectionUtils; + import org.apache.hive.common.util.HiveStringUtils; -import org.apache.hive.common.util.Murmur3; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.BiFunction; + /** * File Sink operator implementation. **/ @@ -1228,6 +1233,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements row_count.set(numRows); LOG.info(toString() + ": records written - " + numRows); + if ("spark".equalsIgnoreCase(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) { + SparkMetricUtils.updateSparkRecordsWrittenMetrics(runTimeNumRows); + } + if (!bDynParts && !filesCreated) { boolean skipFiles = "tez".equalsIgnoreCase( HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE)); @@ -1303,6 +1312,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (isNativeTable()) { fsp.commit(fs, commitPaths); } + if ("spark".equals(HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE))) { + SparkMetricUtils.updateSparkBytesWrittenMetrics(LOG, fs, fsp.finalPaths); + } } if (conf.isMmTable()) { Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java new file mode 100644 index 0000000..1f856ae --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMetricUtils.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.TaskContext; + +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Utility class for update Spark-level metrics. + */ +public final class SparkMetricUtils { + + private SparkMetricUtils() { + // Do nothing + } + + public static void updateSparkRecordsWrittenMetrics(long numRows) { + TaskContext taskContext = TaskContext.get(); + if (taskContext != null && numRows > 0) { + taskContext.taskMetrics().outputMetrics().setRecordsWritten(numRows); + } + } + + public static void updateSparkBytesWrittenMetrics(Logger log, FileSystem fs, Path[] + commitPaths) { + AtomicLong bytesWritten = new AtomicLong(); + Arrays.stream(commitPaths).parallel().forEach(path -> { + try { + bytesWritten.addAndGet(fs.getFileStatus(path).getLen()); + } catch (IOException e) { + log.debug("Unable to collect stats for file: " + path + " output metrics may be inaccurate", + e); + } + }); + if (bytesWritten.get() > 0) { + TaskContext.get().taskMetrics().outputMetrics().setBytesWritten(bytesWritten.get()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java index 12c3eac..b80a8f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsNames.java @@ -54,5 +54,8 @@ public class SparkStatisticsNames { public static final String SHUFFLE_WRITE_TIME = "ShuffleWriteTime"; public static final String SHUFFLE_RECORDS_WRITTEN = "ShuffleRecordsWritten"; + public static final String RECORDS_WRITTEN = "RecordsWritten"; + public static final String BYTES_WRITTEN = "BytesWritten"; + public static final String SPARK_GROUP_NAME = "SPARK"; } http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java index c73c150..a0a0330 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SparkMetricsUtils.java @@ -68,6 +68,10 @@ public final class SparkMetricsUtils { allMetrics.shuffleWriteMetrics.shuffleRecordsWritten); results.put(SparkStatisticsNames.SHUFFLE_WRITE_TIME, allMetrics.shuffleWriteMetrics.shuffleWriteTime); } + if (allMetrics.outputMetrics != null) { + results.put(SparkStatisticsNames.BYTES_WRITTEN, allMetrics.outputMetrics.bytesWritten); + results.put(SparkStatisticsNames.RECORDS_WRITTEN, allMetrics.outputMetrics.recordsWritten); + } return results; } http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java index a0db015..2be19de 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.OutputMetrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics; @@ -171,6 +172,11 @@ public class MetricsCollection { long shuffleWriteTime = 0L; long shuffleRecordsWritten = 0L; + // Input metrics. + boolean hasOuputMetrics = false; + long bytesWritten = 0L; + long recordsWritten = 0L; + for (TaskInfo info : Collections2.filter(taskMetrics, filter)) { Metrics m = info.metrics; executorDeserializeTime += m.executorDeserializeTime; @@ -206,6 +212,12 @@ public class MetricsCollection { shuffleWriteTime += m.shuffleWriteMetrics.shuffleWriteTime; shuffleRecordsWritten += m.shuffleWriteMetrics.shuffleRecordsWritten; } + + if (m.outputMetrics != null) { + hasOuputMetrics = true; + bytesWritten += m.outputMetrics.bytesWritten; + recordsWritten += m.outputMetrics.recordsWritten; + } } InputMetrics inputMetrics = null; @@ -233,6 +245,11 @@ public class MetricsCollection { shuffleRecordsWritten); } + OutputMetrics outputMetrics = null; + if (hasInputMetrics) { + outputMetrics = new OutputMetrics(bytesWritten, recordsWritten); + } + return new Metrics( executorDeserializeTime, executorDeserializeCpuTime, @@ -246,7 +263,8 @@ public class MetricsCollection { taskDurationTime, inputMetrics, shuffleReadMetrics, - shuffleWriteMetrics); + shuffleWriteMetrics, + outputMetrics); } finally { lock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java index cf7a1f6..e09effc 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java @@ -63,10 +63,12 @@ public class Metrics implements Serializable { public final ShuffleReadMetrics shuffleReadMetrics; /** If tasks wrote to shuffle output, metrics on the written shuffle data. */ public final ShuffleWriteMetrics shuffleWriteMetrics; + /** A collection of accumulators that represents metrics about writing data to external systems. */ + public final OutputMetrics outputMetrics; private Metrics() { // For Serialization only. - this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null); + this(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, null, null, null, null); } public Metrics( @@ -82,7 +84,8 @@ public class Metrics implements Serializable { long taskDurationTime, InputMetrics inputMetrics, ShuffleReadMetrics shuffleReadMetrics, - ShuffleWriteMetrics shuffleWriteMetrics) { + ShuffleWriteMetrics shuffleWriteMetrics, + OutputMetrics outputMetrics) { this.executorDeserializeTime = executorDeserializeTime; this.executorDeserializeCpuTime = executorDeserializeCpuTime; this.executorRunTime = executorRunTime; @@ -96,6 +99,7 @@ public class Metrics implements Serializable { this.inputMetrics = inputMetrics; this.shuffleReadMetrics = shuffleReadMetrics; this.shuffleWriteMetrics = shuffleWriteMetrics; + this.outputMetrics = outputMetrics; } public Metrics(TaskMetrics metrics, TaskInfo taskInfo) { @@ -112,7 +116,8 @@ public class Metrics implements Serializable { taskInfo.duration(), optionalInputMetric(metrics), optionalShuffleReadMetric(metrics), - optionalShuffleWriteMetrics(metrics)); + optionalShuffleWriteMetrics(metrics), + optionalOutputMetrics(metrics)); } private static InputMetrics optionalInputMetric(TaskMetrics metrics) { @@ -127,6 +132,10 @@ public class Metrics implements Serializable { return (metrics.shuffleWriteMetrics() != null) ? new ShuffleWriteMetrics(metrics) : null; } + private static OutputMetrics optionalOutputMetrics(TaskMetrics metrics) { + return (metrics.outputMetrics() != null) ? new OutputMetrics(metrics) : null; + } + @Override public String toString() { return "Metrics{" + @@ -143,6 +152,7 @@ public class Metrics implements Serializable { ", inputMetrics=" + inputMetrics + ", shuffleReadMetrics=" + shuffleReadMetrics + ", shuffleWriteMetrics=" + shuffleWriteMetrics + + ", outputMetrics=" + outputMetrics + '}'; } } http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java new file mode 100644 index 0000000..99516ff --- /dev/null +++ b/spark-client/src/main/java/org/apache/hive/spark/client/metrics/OutputMetrics.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client.metrics; + +import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.spark.executor.TaskMetrics; + +import java.io.Serializable; + +/** + * Metrics pertaining to writing data. + */ +@InterfaceAudience.Private +public class OutputMetrics implements Serializable { + + /** Total number of bytes written. */ + public final long bytesWritten; + /** Total number of records written. */ + public final long recordsWritten; + + private OutputMetrics() { + // For Serialization only. + this(0L, 0L); + } + + public OutputMetrics(long bytesWritten, long recordsWritten) { + this.bytesWritten = bytesWritten; + this.recordsWritten = recordsWritten; + } + + public OutputMetrics(TaskMetrics metrics) { + this(metrics.outputMetrics().bytesWritten(), metrics.outputMetrics().recordsWritten()); + } + + @Override + public String toString() { + return "OutputMetrics{" + + "bytesWritten=" + bytesWritten + + ", recordsWritten=" + recordsWritten + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f04eba3c/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java index 2d4c43d..a5c13ca 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java @@ -26,6 +26,7 @@ import java.util.Arrays; import org.apache.hive.spark.client.metrics.DataReadMethod; import org.apache.hive.spark.client.metrics.InputMetrics; import org.apache.hive.spark.client.metrics.Metrics; +import org.apache.hive.spark.client.metrics.OutputMetrics; import org.apache.hive.spark.client.metrics.ShuffleReadMetrics; import org.apache.hive.spark.client.metrics.ShuffleWriteMetrics; import org.junit.Test; @@ -67,7 +68,7 @@ public class TestMetricsCollection { public void testOptionalMetrics() { long value = taskValue(1, 1, 1L); Metrics metrics = new Metrics(value, value, value, value, value, value, value, value, value, - value, null, null, null); + value, null, null, null, null); MetricsCollection collection = new MetricsCollection(); for (int i : Arrays.asList(1, 2)) { @@ -96,9 +97,9 @@ public class TestMetricsCollection { long value = taskValue(1, 1, 1); Metrics metrics1 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value, value), null, null); + value, new InputMetrics(value, value), null, null, null); Metrics metrics2 = new Metrics(value, value, value, value, value, value, value, value, value, - value, new InputMetrics(value, value), null, null); + value, new InputMetrics(value, value), null, null, null); collection.addMetrics(1, 1, 1, metrics1); collection.addMetrics(1, 1, 2, metrics2); @@ -112,7 +113,8 @@ public class TestMetricsCollection { return new Metrics(value, value, value, value, value, value, value, value, value, value, new InputMetrics(value, value), new ShuffleReadMetrics((int) value, (int) value, value, value, value, value, value), - new ShuffleWriteMetrics(value, value, value)); + new ShuffleWriteMetrics(value, value, value), + new OutputMetrics(value, value)); } /** @@ -173,6 +175,8 @@ public class TestMetricsCollection { assertEquals(expected, metrics.shuffleWriteMetrics.shuffleBytesWritten); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleWriteTime); assertEquals(expected, metrics.shuffleWriteMetrics.shuffleRecordsWritten); - } + assertEquals(expected, metrics.outputMetrics.recordsWritten); + assertEquals(expected, metrics.outputMetrics.bytesWritten); + } }