MasseGuillaume commented on code in PR #50474:
URL: https://github.com/apache/spark/pull/50474#discussion_r2109688257
##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -1014,39 +1014,44 @@ final class ShuffleBlockFetcherIterator(
// a SuccessFetchResult or a FailureFetchResult.
result = null
- case PushMergedLocalMetaFetchResult(
- shuffleId, shuffleMergeId, reduceId, bitmaps, localDirs) =>
- // Fetch push-merged-local shuffle block data as multiple shuffle
chunks
- val shuffleBlockId = ShuffleMergedBlockId(shuffleId,
shuffleMergeId, reduceId)
- try {
- val bufs: Seq[ManagedBuffer] =
blockManager.getLocalMergedBlockData(shuffleBlockId,
- localDirs)
- // Since the request for local block meta completed
successfully, numBlocksToFetch
- // is decremented.
- numBlocksToFetch -= 1
- // Update total number of blocks to fetch, reflecting the
multiple local shuffle
- // chunks.
- numBlocksToFetch += bufs.size
- bufs.zipWithIndex.foreach { case (buf, chunkId) =>
- buf.retain()
- val shuffleChunkId = ShuffleBlockChunkId(shuffleId,
shuffleMergeId, reduceId,
- chunkId)
- pushBasedFetchHelper.addChunk(shuffleChunkId, bitmaps(chunkId))
- results.put(SuccessFetchResult(shuffleChunkId,
SHUFFLE_PUSH_MAP_ID,
- pushBasedFetchHelper.localShuffleMergerBlockMgrId,
buf.size(), buf,
- isNetworkReqDone = false))
- }
- } catch {
- case e: Exception =>
- // If we see an exception with reading push-merged-local index
file, we fallback
- // to fetch the original blocks. We do not report block fetch
failure
- // and will continue with the remaining local block read.
- logWarning("Error occurred while reading push-merged-local
index, " +
- "prepare to fetch the original blocks", e)
- pushBasedFetchHelper.initiateFallbackFetchForPushMergedBlock(
- shuffleBlockId,
pushBasedFetchHelper.localShuffleMergerBlockMgrId)
+ case PushMergedLocalMetaFetchResult(
Review Comment:
I compiled most of the spark release dates with relation to Scala in this
document:
https://docs.google.com/spreadsheets/d/1yq6mO6x-xPYR4gxHbFjF9KfTsY6j8ceUmaZeoo0Ozlc/edit?gid=1454157208#gid=1454157208
I want to eventually write a proper article about this, however here is the
gist of my findings:
Spark use to be quick to adopt new Scala version (a year for 2.10 and 2.11)
as it matured it got really slow at adopting new Scala version (3 years 7
months for 2.12 and 2 years 4 months for 2.13).
When Spark supports multiple versions, for example Scala 2.12 and 2.13 since
Spark 3.2.0, commercial platforms such as AWS EMR and Databricks will only
support the lowest version.
This gives us
* Scala 2.11 from 2016-07-27 to 2020-03-10 on EMR
* Scala 2.12 since ? 2020 until now (spark 4.0.0 dropping Scala 2.12 in
favour of 2.13).
With spark 4.0 on 2.13 since this means with can finally [run scala 3 on a
spark
cluster](https://xebia.com/blog/using-scala-3-with-spark/#running-on-a-real-spark-cluster)
(with some little
[tweaks](https://vincenzobaz.github.io/spark-scala3/how_it_works.html)). With
an early adoption of Databrick for [Scala
2.13](https://docs.databricks.com/aws/en/release-notes/runtime/16.4lts), I
expect other vendors to follow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]