This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 48839f6 [SPARK-38706][CORE] Use URI in `FallbackStorage.copy` 48839f6 is described below commit 48839f6cad14a3462c278d1a3c10b35dde1adcc3 Author: William Hyun <will...@apache.org> AuthorDate: Wed Mar 30 21:15:50 2022 -0700 [SPARK-38706][CORE] Use URI in `FallbackStorage.copy` ### What changes were proposed in this pull request? This PR aims to use URI in `FallbackStorage.copy` method. ### Why are the changes needed? Like the case of SPARK-38652, the current fallback feature is broken with `S3A` due to Hadoop 3.3.2's `org.apache.hadoop.fs.PathIOException`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually start one master and executor and decommission the executor. ``` spark.decommission.enabled true spark.storage.decommission.enabled true spark.storage.decommission.shuffleBlocks.enabled true spark.storage.decommission.fallbackStorage.path s3a://spark/storage/ ``` ``` $ curl -v -X POST -d "host=hostname" http://hostname:8080/workers/kill/ ``` Closes #36017 from williamhyun/fallbackstorage. Authored-by: William Hyun <will...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 60d09213105f235793f3418d79e6755561a19b15) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 0c1206c..e644ffe 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -63,14 +63,14 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { if (indexFile.exists()) { val hash = JavaUtils.nonNegativeHash(indexFile.getName) fallbackFileSystem.copyFromLocalFile( - new Path(indexFile.getAbsolutePath), + new Path(Utils.resolveURI(indexFile.getAbsolutePath)), new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}")) val dataFile = r.getDataFile(shuffleId, mapId) if (dataFile.exists()) { val hash = JavaUtils.nonNegativeHash(dataFile.getName) fallbackFileSystem.copyFromLocalFile( - new Path(dataFile.getAbsolutePath), + new Path(Utils.resolveURI(dataFile.getAbsolutePath)), new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}")) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org