Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23207#discussion_r239060606 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala --- @@ -95,3 +96,59 @@ private[spark] object SQLShuffleMetricsReporter { FETCH_WAIT_TIME -> SQLMetrics.createTimingMetric(sc, "fetch wait time"), RECORDS_READ -> SQLMetrics.createMetric(sc, "records read")) } + +/** + * A shuffle write metrics reporter for SQL exchange operators. Different with + * [[SQLShuffleReadMetricsReporter]], we need a function of (reporter => reporter) set in + * shuffle dependency, so the local SQLMetric should transient and create on executor. + * @param metrics Shuffle write metrics in current SparkPlan. + * @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter. + */ +private[spark] case class SQLShuffleWriteMetricsReporter( + metrics: Map[String, SQLMetric])(metricsReporter: ShuffleWriteMetricsReporter) + extends ShuffleWriteMetricsReporter with Serializable { + @transient private[this] lazy val _bytesWritten = + metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_BYTES_WRITTEN) + @transient private[this] lazy val _recordsWritten = + metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN) + @transient private[this] lazy val _writeTime = + metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME) + + override private[spark] def incBytesWritten(v: Long): Unit = { + metricsReporter.incBytesWritten(v) + _bytesWritten.add(v) + } + override private[spark] def decRecordsWritten(v: Long): Unit = { + metricsReporter.decBytesWritten(v) + _recordsWritten.set(_recordsWritten.value - v) + } + override private[spark] def incRecordsWritten(v: Long): Unit = { + metricsReporter.incRecordsWritten(v) + _recordsWritten.add(v) + } + override private[spark] def incWriteTime(v: Long): Unit = { + metricsReporter.incWriteTime(v) + _writeTime.add(v) + } + override private[spark] def decBytesWritten(v: Long): Unit = { + metricsReporter.decBytesWritten(v) + _bytesWritten.set(_bytesWritten.value - v) + } +} + +private[spark] object SQLShuffleWriteMetricsReporter { + val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten" + val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten" + val SHUFFLE_WRITE_TIME = "shuffleWriteTime" --- End diff -- do we have other time metrics using nanoseconds?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org