otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r860171613
##
core/src/main/scala/org/apache/spark/InternalAccumulator.scala:
##
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX +
"remoteReqsDuration"
+val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX +
"corruptMergedBlockChunks"
+val ORIGINAL_BLOCK_FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX +
"fallbackCount"
Review Comment:
Nit: `ORIGINAL_BLOCK_FALLBACK_COUNT` -> `FALLBACK_COUNT`. This keeps track
of how many times fallback is triggered. The prefix `ORIGINAL_BLOCK` seems odd
##
core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala:
##
@@ -84,13 +94,76 @@ class ShuffleReadMetrics private[spark] () extends
Serializable {
*/
def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
+ /**
+ * Number of corrupt merged shuffle block chunks encountered by this task
(remote or local).
+ */
+ def corruptMergedBlockChunks: Long = _corruptMergedBlockChunks.sum
+
+ /**
+ * Number of times the task had to fallback to fetch original shuffle blocks
for a merged
+ * shuffle block chunk (remote or local).
+ */
+ def fallbackCount: Long = _fallbackCount.sum
+
+ /**
+ * Number of remote merged blocks fetched.
+ */
+ def remoteMergedBlocksFetched: Long = _remoteMergedBlocksFetched.sum
+
+ /**
+ * Number of local merged blocks fetched.
+ */
+ def localMergedBlocksFetched: Long = _localMergedBlocksFetched.sum
+
+ /**
+ * Number of remote merged chunks fetched.
+ */
+ def remoteMergedChunksFetched: Long = _remoteMergedChunksFetched.sum
+
+ /**
+ * Number of local merged chunks fetched.
+ */
+ def localMergedChunksFetched: Long = _localMergedChunksFetched.sum
+
+ /**
+ * Total number of remote merged bytes read.
+ */
+ def remoteMergedBlocksBytesRead: Long = _remoteMergedBlocksBytesRead.sum
+
+ /**
+ * Total number of local merged bytes read.
+ */
+ def localMergedBlocksBytesRead: Long = _localMergedBlocksBytesRead.sum
+
+ /**
+ * Total time taken for remote requests.
+ */
+ def remoteReqsDuration: Long = _remoteReqsDuration.sum +
_remoteMergedReqsDuration.sum
+
Review Comment:
In the PR for the jira SPARK-32922, it was decided to not add push-merge
related metrics to original/unmerged metrics. For example
totalRemoteShuffleBlocksFetched in master doesn't include
remoteMergedShuffleBlocks. I prefer consistency so even for this metric, my
recommendation is to not add `remoteMergedReqsDuration` to `remoteReqsDuration`
##
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##
@@ -627,9 +709,22 @@ private[spark] class AppStatusStore(
shuffleLocalBytesRead = stage.shuffleLocalBytesRead,
shuffleReadBytes = stage.shuffleReadBytes,
shuffleReadRecords = stage.shuffleReadRecords,
+pushBasedShuffleCorruptMergedBlockChunks =
stage.pushBasedShuffleCorruptMergedBlockChunks,
+pushBasedShuffleFallbackCount = stage.pushBasedShuffleFallbackCount,
+pushBasedShuffleMergedRemoteBlocksFetched =
stage.pushBasedShuffleMergedRemoteBlocksFetched,
+pushBasedShuffleMergedLocalBlocksFetched =
stage.pushBasedShuffleMergedLocalBlocksFetched,
+pushBasedShuffleMergedRemoteChunksFetched =
stage.pushBasedShuffleMergedRemoteChunksFetched,
+pushBasedShuffleMergedLocalChunksFetched =
stage.pushBasedShuffleMergedLocalChunksFetched,
+pushBasedShuffleMergedRemoteBytesRead =
stage.pushBasedShuffleMergedRemoteBytesRead,
+pushBasedShuffleMergedLocalBytesRead =
stage.pushBasedShuffleMergedLocalBytesRead,
+pushBasedShuffleRemoteReqsDuration =
stage.pushBasedShuffleRemoteReqsDuration,
Review Comment:
This looks off. This shouldn't be prefixed by `pushBasedShuffle`
##
core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala:
##
@@ -40,8 +40,8 @@ class StageInfo(
private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] =
Seq.empty,
private[spark] val shuffleDepId: Option[Int] = None,
val resourceProfileId: Int,
-private[spark] var isPushBasedShuffleEnabled: Boolean = false,
-private[spark] var shuffleMergerCount: Int = 0) {
+var isPushBasedShuffleEnabled: Boolean = false,
+var shuffleMergerCount: Int = 0) {
Review Comment:
Is this change needed for a test?
##
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##
@@ -448,10 +504,36 @@ private[spark] class AppStatusStore(
shuffleRemoteBytesReadToDisk =
computedQuantiles.shuffleReadMetrics.remo