[GitHub] [spark] otterc commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-11-23 Thread GitBox


otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1030746682


##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -282,6 +280,17 @@ final class ShuffleBlockFetcherIterator(
   }
 }
 
+@inline def updateMergedReqsDuration(wasReqForMergedChunks: Boolean = 
false): Unit = {
+  if (remainingBlocks.isEmpty) {
+val durationMs = TimeUnit.NANOSECONDS.toMillis(clock.nanoTime() - 
requestStartTime)
+if (wasReqForMergedChunks) {
+  shuffleMetrics.incRemoteMergedReqsDuration(durationMs)
+} else {
+  shuffleMetrics.incRemoteReqsDuration(durationMs)
+}

Review Comment:
   We added these metrics internally so we can easily figure out what is taking 
longer- the fetch of the merge chunks or the few unmerged blocks. If we include 
the duration of merged chunks in the total then we have to derive the value for 
un-merged blocks which is fine. It makes sense to be consistent to avoid 
confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-28 Thread GitBox


otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1008467914


##
core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala:
##
@@ -290,6 +301,7 @@ private class PushBasedFetchHelper(
   address: BlockManagerId): Unit = {
 assert(blockId.isInstanceOf[ShuffleMergedBlockId] || 
blockId.isInstanceOf[ShuffleBlockChunkId])
 logWarning(s"Falling back to fetch the original blocks for push-merged 
block $blockId")
+shuffleMetrics.incFallbackCount(1)

Review Comment:
   Fallback count measures the number of times fallback is initiated for a 
block which is either a `shuffleMergedBlock` or a `shuffleBlockChunk`. I don't 
understand why we need to move into the case that you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-10-28 Thread GitBox


otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r1008462544


##
core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala:
##
@@ -78,6 +78,16 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
 shuffleLocalBytesRead = 1L,
 shuffleReadBytes = 1L,
 shuffleReadRecords = 1L,
+shuffleCorruptMergedBlockChunks = 2L,
+shuffleFallbackCount = 2L,
+shuffleMergedRemoteBlocksFetched = 1L,
+shuffleMergedLocalBlocksFetched = 1L,
+shuffleMergedRemoteChunksFetched = 1L,
+shuffleMergedLocalChunksFetched = 1L,
+shuffleMergedRemoteBytesRead = 1L,
+shuffleMergedLocalBytesRead = 1L,
+shuffleRemoteReqsDuration = 1L,
+shuffleMergedRemoteReqsDuration = 1L,

Review Comment:
   We didn't put much thought into it. But `shuffleMergerCount` can be 0 
because this stage may not push but this stage can still read shuffle data 
generated by the previous stage. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36165: [SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

2022-04-27 Thread GitBox


otterc commented on code in PR #36165:
URL: https://github.com/apache/spark/pull/36165#discussion_r860171613


##
core/src/main/scala/org/apache/spark/InternalAccumulator.scala:
##
@@ -54,13 +56,26 @@ private[spark] object InternalAccumulator {
 val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
 val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
 val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
+val REMOTE_REQS_DURATION = SHUFFLE_READ_METRICS_PREFIX + 
"remoteReqsDuration"
+val CORRUPT_MERGED_BLOCK_CHUNKS = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"corruptMergedBlockChunks"
+val ORIGINAL_BLOCK_FALLBACK_COUNT = SHUFFLE_PUSH_READ_METRICS_PREFIX + 
"fallbackCount"

Review Comment:
   Nit: `ORIGINAL_BLOCK_FALLBACK_COUNT` -> `FALLBACK_COUNT`. This keeps track 
of how many times fallback is triggered. The prefix `ORIGINAL_BLOCK` seems odd



##
core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala:
##
@@ -84,13 +94,76 @@ class ShuffleReadMetrics private[spark] () extends 
Serializable {
*/
   def totalBlocksFetched: Long = remoteBlocksFetched + localBlocksFetched
 
+  /**
+   * Number of corrupt merged shuffle block chunks encountered by this task 
(remote or local).
+   */
+  def corruptMergedBlockChunks: Long = _corruptMergedBlockChunks.sum
+
+  /**
+   * Number of times the task had to fallback to fetch original shuffle blocks 
for a merged
+   * shuffle block chunk (remote or local).
+   */
+  def fallbackCount: Long = _fallbackCount.sum
+
+  /**
+   * Number of remote merged blocks fetched.
+   */
+  def remoteMergedBlocksFetched: Long = _remoteMergedBlocksFetched.sum
+
+  /**
+   * Number of local merged blocks fetched.
+   */
+  def localMergedBlocksFetched: Long = _localMergedBlocksFetched.sum
+
+  /**
+   * Number of remote merged chunks fetched.
+   */
+  def remoteMergedChunksFetched: Long = _remoteMergedChunksFetched.sum
+
+  /**
+   * Number of local merged chunks fetched.
+   */
+  def localMergedChunksFetched: Long = _localMergedChunksFetched.sum
+
+  /**
+   * Total number of remote merged bytes read.
+   */
+  def remoteMergedBlocksBytesRead: Long = _remoteMergedBlocksBytesRead.sum
+
+  /**
+   * Total number of local merged bytes read.
+   */
+  def localMergedBlocksBytesRead: Long = _localMergedBlocksBytesRead.sum
+
+  /**
+   * Total time taken for remote requests.
+   */
+  def remoteReqsDuration: Long = _remoteReqsDuration.sum + 
_remoteMergedReqsDuration.sum
+

Review Comment:
   In the PR for the jira SPARK-32922, it was decided to not add push-merge 
related metrics to original/unmerged metrics. For example 
totalRemoteShuffleBlocksFetched in master doesn't include 
remoteMergedShuffleBlocks. I prefer consistency so even for this metric, my 
recommendation is to not add `remoteMergedReqsDuration` to `remoteReqsDuration`



##
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##
@@ -627,9 +709,22 @@ private[spark] class AppStatusStore(
 shuffleLocalBytesRead = stage.shuffleLocalBytesRead,
 shuffleReadBytes = stage.shuffleReadBytes,
 shuffleReadRecords = stage.shuffleReadRecords,
+pushBasedShuffleCorruptMergedBlockChunks = 
stage.pushBasedShuffleCorruptMergedBlockChunks,
+pushBasedShuffleFallbackCount = stage.pushBasedShuffleFallbackCount,
+pushBasedShuffleMergedRemoteBlocksFetched = 
stage.pushBasedShuffleMergedRemoteBlocksFetched,
+pushBasedShuffleMergedLocalBlocksFetched = 
stage.pushBasedShuffleMergedLocalBlocksFetched,
+pushBasedShuffleMergedRemoteChunksFetched = 
stage.pushBasedShuffleMergedRemoteChunksFetched,
+pushBasedShuffleMergedLocalChunksFetched = 
stage.pushBasedShuffleMergedLocalChunksFetched,
+pushBasedShuffleMergedRemoteBytesRead = 
stage.pushBasedShuffleMergedRemoteBytesRead,
+pushBasedShuffleMergedLocalBytesRead = 
stage.pushBasedShuffleMergedLocalBytesRead,
+pushBasedShuffleRemoteReqsDuration = 
stage.pushBasedShuffleRemoteReqsDuration,

Review Comment:
   This looks off. This shouldn't be prefixed by `pushBasedShuffle`



##
core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala:
##
@@ -40,8 +40,8 @@ class StageInfo(
 private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = 
Seq.empty,
 private[spark] val shuffleDepId: Option[Int] = None,
 val resourceProfileId: Int,
-private[spark] var isPushBasedShuffleEnabled: Boolean = false,
-private[spark] var shuffleMergerCount: Int = 0) {
+var isPushBasedShuffleEnabled: Boolean = false,
+var shuffleMergerCount: Int = 0) {

Review Comment:
   Is this change needed for a test?



##
core/src/main/scala/org/apache/spark/status/AppStatusStore.scala:
##
@@ -448,10 +504,36 @@ private[spark] class AppStatusStore(
   shuffleRemoteBytesReadToDisk =
 computedQuantiles.shuffleReadMetrics.remo