[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
thejdeep commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1034008608 ## project/MimaExcludes.scala: ## @@ -87,6 +87,11 @@ object MimaExcludes { // [SPARK-36511][MINOR][SQL] Remove ColumnIOUtil ProblemFilters.exclude[MissingClassProblem]("org.apache.parquet.io.ColumnIOUtil"), +// [SPARK-36620] [SHUFFLE] Expose push based shuffle metrics + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.StageData.this"), + // [SPARK-40324][SQL] Provide query context in AnalysisException Review Comment: There are some binary incompatibilities that had to be added to the excludes list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
thejdeep commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1029584122 ## core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala: ## @@ -146,6 +268,16 @@ private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter { override def incLocalBytesRead(v: Long): Unit = _localBytesRead += v override def incFetchWaitTime(v: Long): Unit = _fetchWaitTime += v override def incRecordsRead(v: Long): Unit = _recordsRead += v + override def incCorruptMergedBlockChunks(v: Long): Unit = _corruptMergedBlockChunks += v + override def incFallbackCount(v: Long): Unit = _fallbackCount += v + override def incRemoteMergedBlocksFetched(v: Long): Unit = _remoteMergedBlocksFetched += v + override def incLocalMergedBlocksFetched(v: Long): Unit = _localMergedBlocksFetched += v + override def incRemoteMergedChunksFetched(v: Long): Unit = _remoteMergedChunksFetched += v + override def incLocalMergedChunksFetched(v: Long): Unit = _localMergedChunksFetched += v + override def incRemoteMergedBlocksBytesRead(v: Long): Unit = _remoteMergedBlocksBytesRead += v + override def incLocalMergedBlocksBytesRead(v: Long): Unit = _localMergedBlocksBytesRead += v + override def incRemoteReqsDuration(v: Long): Unit = _remoteReqsDuration += v + override def incRemoteMergedReqsDuration(v: Long): Unit = _remoteMergedReqsDuration += v Review Comment: Updated `ShuffleBlockFetchIterator` to include these metrics too, thanks for catching it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
thejdeep commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1010567174 ## core/src/main/scala/org/apache/spark/executor/Executor.scala: ## @@ -654,6 +654,27 @@ private[spark] class Executor( executorSource.METRIC_RESULT_SIZE.inc(task.metrics.resultSize) executorSource.METRIC_DISK_BYTES_SPILLED.inc(task.metrics.diskBytesSpilled) executorSource.METRIC_MEMORY_BYTES_SPILLED.inc(task.metrics.memoryBytesSpilled) +executorSource.METRIC_PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS + .inc(task.metrics.shuffleReadMetrics.corruptMergedBlockChunks) +executorSource.METRIC_PUSH_BASED_SHUFFLE_FALLBACK_COUNT + .inc(task.metrics.shuffleReadMetrics.fallbackCount) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksFetched) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.localMergedBlocksFetched) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.remoteMergedChunksFetched) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.localMergedChunksFetched) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_BYTES_READ + .inc(task.metrics.shuffleReadMetrics.remoteMergedBlocksBytesRead) + +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS_FETCHED + .inc(task.metrics.shuffleReadMetrics.localMergedBlocksBytesRead) +executorSource.METRIC_PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION + .inc(task.metrics.shuffleReadMetrics.remoteReqsDuration) +executorSource.METRIC_PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION + .inc(task.metrics.shuffleReadMetrics.remoteMergedReqsDuration) Review Comment: Added method to populate these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
thejdeep commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1010566832 ## core/src/main/scala/org/apache/spark/status/storeTypes.scala: ## @@ -138,6 +138,16 @@ private[spark] object TaskIndexNames { final val SHUFFLE_WRITE_RECORDS = "swr" final val SHUFFLE_WRITE_SIZE = "sws" final val SHUFFLE_WRITE_TIME = "swt" + final val PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS = "scmbc" + final val PUSH_BASED_SHUFFLE_FALLBACK_COUNT = "sfc" + final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS = "smrb" + final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS = "smlb" + final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS = "smrc" + final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS = "smlc" + final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS = "smrr" + final val PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS = "smlr" + final val PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION = "srrd" + final val PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION = "smrrd" Review Comment: Added prefix, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
thejdeep commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1010566209 ## core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala: ## @@ -2623,48 +2677,118 @@ private[spark] object JsonProtocolSuite extends Assertions { |}, |{ | "ID": 19, + | "Name": "${shuffleRead.FALLBACK_COUNT}", + | "Update": 0, + | "Internal": true, + | "Count Failed Values": true + |}, + |{ + | "ID" : 20, + | "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_FETCHED}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 21, + | "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_FETCHED}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 22, + | "Name" : "${shuffleRead.REMOTE_MERGED_CHUNKS_FETCHED}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 23, + | "Name" : "${shuffleRead.LOCAL_MERGED_CHUNKS_FETCHED}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 24, + | "Name" : "${shuffleRead.REMOTE_MERGED_BLOCKS_READ}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 25, + | "Name" : "${shuffleRead.LOCAL_MERGED_BLOCKS_READ}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 26, + | "Name" : "${shuffleRead.REMOTE_REQS_DURATION}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID" : 27, + | "Name" : "${shuffleRead.REMOTE_MERGED_REQS_DURATION}", + | "Update" : 0, + | "Internal" : true, + | "Count Failed Values" : true + |}, + |{ + | "ID": 28, + | "Name": "${shuffleWrite.BYTES_WRITTEN}", + | "Update": 0, + | "Internal": true, + | "Count Failed Values": true + |}, + |{ + | "ID": 29, | "Name": "${shuffleWrite.RECORDS_WRITTEN}", | "Update": 0, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 20, + | "ID": 30, | "Name": "${shuffleWrite.WRITE_TIME}", | "Update": 0, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 21, + | "ID": 31, | "Name": "${input.BYTES_READ}", | "Update": 2100, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 22, + | "ID": 32, | "Name": "${input.RECORDS_READ}", | "Update": 21, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 23, + | "ID": 33, | "Name": "${output.BYTES_WRITTEN}", | "Update": 1200, | "Internal": true, | "Count Failed Values": true |}, |{ - | "ID": 24, + | "ID": 34, | "Name": "${output.RECORDS_WRITTEN}", Review Comment: The values just changed since metrics were added that took the place of indices preceding this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] thejdeep commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
thejdeep commented on code in PR #36165: URL: https://github.com/apache/spark/pull/36165#discussion_r1008470301 ## core/src/main/scala/org/apache/spark/status/storeTypes.scala: ## @@ -233,6 +243,38 @@ private[spark] class TaskDataWrapper( val shuffleLocalBytesRead: Long, @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = TaskIndexNames.STAGE) val shuffleRecordsRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_CORRUPT_MERGED_BLOCK_CHUNKS, + parent = TaskIndexNames.STAGE) +val shuffleCorruptMergedBlockChunks: Long, +@KVIndexParam(value = TaskIndexNames.PUSH_BASED_SHUFFLE_FALLBACK_COUNT, + parent = TaskIndexNames.STAGE) +val shuffleFallbackCount: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_BLOCKS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteBlocksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_BLOCKS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalBlocksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_CHUNKS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteChunksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_CHUNKS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalChunksFetched: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_READS, parent = TaskIndexNames.STAGE) +val shuffleMergedRemoteBlocksBytesRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_LOCAL_READS, parent = TaskIndexNames.STAGE) +val shuffleMergedLocalBlocksBytesRead: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_REMOTE_REQS_DURATION, parent = TaskIndexNames.STAGE) +val shuffleRemoteReqsDuration: Long, +@KVIndexParam( + value = TaskIndexNames.PUSH_BASED_SHUFFLE_MERGED_REMOTE_REQS_DURATION, + parent = TaskIndexNames.STAGE) Review Comment: Indexes for these fields is a requirement when we are computing quantiles and reading `TaskDataWrapper`s. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org