[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-28 Thread GitBox


thejdeep commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1034008608


##
project/MimaExcludes.scala:
##
@@ -87,6 +87,11 @@ object MimaExcludes {
 // [SPARK-36511][MINOR][SQL] Remove ColumnIOUtil
 
ProblemFilters.exclude[MissingClassProblem]("org.apache.parquet.io.ColumnIOUtil"),
 
+// [SPARK-36620] [SHUFFLE] Expose push based shuffle metrics
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.StageData.this"),
+
 // [SPARK-40324][SQL] Provide query context in AnalysisException

Review Comment:
   There are some binary incompatibilities that had to be added to the excludes 
list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-22 Thread GitBox


thejdeep commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1029584122


##
core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala:
##
@@ -146,6 +268,16 @@ private[spark] class TempShuffleReadMetrics extends 
ShuffleReadMetricsReporter {
   override def incLocalBytesRead(v: Long): Unit = _localBytesRead += v
   override def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v
   override def incRecordsRead(v: Long): Unit = _recordsRead += v
+  override def incCorruptMergedBlockChunks(v: Long): Unit = 
_corruptMergedBlockChunks += v
+  override def incFallbackCount(v: Long): Unit = _fallbackCount += v
+  override def incRemoteMergedBlocksFetched(v: Long): Unit = 
_remoteMergedBlocksFetched += v
+  override def incLocalMergedBlocksFetched(v: Long): Unit = 
_localMergedBlocksFetched += v
+  override def incRemoteMergedChunksFetched(v: Long): Unit = 
_remoteMergedChunksFetched += v
+  override def incLocalMergedChunksFetched(v: Long): Unit = 
_localMergedChunksFetched += v
+  override def incRemoteMergedBlocksBytesRead(v: Long): Unit = 
_remoteMergedBlocksBytesRead += v
+  override def incLocalMergedBlocksBytesRead(v: Long): Unit = 
_localMergedBlocksBytesRead += v
+  override def incRemoteReqsDuration(v: Long): Unit = _remoteReqsDuration += v
+  override def incRemoteMergedReqsDuration(v: Long): Unit = 
_remoteMergedReqsDuration += v

Review Comment:
   Updated `ShuffleBlockFetchIterator` to include these metrics too, thanks for 
catching it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-01 Thread GitBox


thejdeep commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1010567174


##
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##
@@ -654,6 +654,27 @@ private[spark] class Executor(
 executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize)
 
executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled)
 
executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS
+  .inc(task.metrics.shuffleReadMetrics.corruptMergedBlockChunks)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_FALLBACK_COUNT
+  .inc(task.metrics.shuffleReadMetrics.fallbackCount)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksFetched)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.localMergedBlocksFetched)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.remoteMergedChunksFetched)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.localMergedChunksFetched)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BYTES_READ
+  .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksBytesRead)
+
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED
+  .inc(task.metrics.shuffleReadMetrics.localMergedBlocksBytesRead)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION
+  .inc(task.metrics.shuffleReadMetrics.remoteReqsDuration)
+executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION
+  .inc(task.metrics.shuffleReadMetrics.remoteMergedReqsDuration)

Review Comment:
   Added method to populate these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-01 Thread GitBox


thejdeep commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1010566832


##
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##
@@ -138,6 +138,16 @@ private[spark] object TaskIndexNames {
   final val SHUFFLE_WRITE_RECORDS = "swr"
   final val SHUFFLE_WRITE_SIZE = "sws"
   final val SHUFFLE_WRITE_TIME = "swt"
+  final val PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS = "scmbc"
+  final val PUSH_BASED_SHUFFLE_FALLBACK_COUNT = "sfc"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS = "smrb"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS = "smlb"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS = "smrc"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS = "smlc"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS = "smrr"
+  final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS = "smlr"
+  final val PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION = "srrd"
+  final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION = "smrrd"

Review Comment:
   Added prefix, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-01 Thread GitBox


thejdeep commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1010566209


##
core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala:
##
@@ -2623,48 +2677,118 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   |},
   |{
   |  "ID": 19,
+  |  "Name": "${shuffleRead.FALLBACK_COUNT}",
+  |  "Update": 0,
+  |  "Internal": true,
+  |  "Count Failed Values": true
+  |},
+  |{
+  |  "ID" : 20,
+  |  "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 21,
+  |  "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 22,
+  |  "Name" : "${shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 23,
+  |  "Name" : "${shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 24,
+  |  "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_READ}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 25,
+  |  "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_READ}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 26,
+  |  "Name" : "${shuffleRead.REMOTE_REQS_DURATION}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID" : 27,
+  |  "Name" : "${shuffleRead.REMOTE_MERGED_REQS_DURATION}",
+  |  "Update" : 0,
+  |  "Internal" : true,
+  |  "Count Failed Values" : true
+  |},
+  |{
+  |  "ID": 28,
+  |  "Name": "${shuffleWrite.BYTES_WRITTEN}",
+  |  "Update": 0,
+  |  "Internal": true,
+  |  "Count Failed Values": true
+  |},
+  |{
+  |  "ID": 29,
   |  "Name": "${shuffleWrite.RECORDS_WRITTEN}",
   |  "Update": 0,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 20,
+  |  "ID": 30,
   |  "Name": "${shuffleWrite.WRITE_TIME}",
   |  "Update": 0,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 21,
+  |  "ID": 31,
   |  "Name": "${input.BYTES_READ}",
   |  "Update": 2100,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 22,
+  |  "ID": 32,
   |  "Name": "${input.RECORDS_READ}",
   |  "Update": 21,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 23,
+  |  "ID": 33,
   |  "Name": "${output.BYTES_WRITTEN}",
   |  "Update": 1200,
   |  "Internal": true,
   |  "Count Failed Values": true
   |},
   |{
-  |  "ID": 24,
+  |  "ID": 34,
   |  "Name": "${output.RECORDS_WRITTEN}",

Review Comment:
   The values just changed since metrics were added that took the place of 
indices preceding this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-28 Thread GitBox


thejdeep commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1008470301


##
core/src/main/scala/org/apache/spark/status/storeTypes.scala:
##
@@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper(
 val shuffleLocalBytesRead: Long,
 @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = 
TaskIndexNames.STAGE)
 val shuffleRecordsRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS,
+  parent = TaskIndexNames.STAGE)
+val shuffleCorruptMergedBlockChunks: Long,
+@KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT,
+  parent = TaskIndexNames.STAGE)
+val shuffleFallbackCount: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteBlocksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalBlocksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteChunksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalChunksFetched: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedRemoteBlocksBytesRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = 
TaskIndexNames.STAGE)
+val shuffleMergedLocalBlocksBytesRead: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = 
TaskIndexNames.STAGE)
+val shuffleRemoteReqsDuration: Long,
+@KVIndexParam(
+  value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION,
+  parent = TaskIndexNames.STAGE)

Review Comment:
   Indexes for these fields is a requirement when we are computing quantiles 
and reading `TaskDataWrapper`s.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org