This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 994b6976b2a [SPARK-44915][CORE] Validate checksum of remounted PVC's 
shuffle data before recovery
994b6976b2a is described below

commit 994b6976b2a5a53323a83e70e0c6195cd74292a1
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Tue Sep 12 01:15:01 2023 -0700

    [SPARK-44915][CORE] Validate checksum of remounted PVC's shuffle data 
before recovery
    
    ### What changes were proposed in this pull request?
    
    This PR aims to validate checksum of remounted PVC's shuffle data before 
recovery.
    
    ### Why are the changes needed?
    
    In general, there are many reasons which causes the executor terminations 
and some of them causes data corruptions on disks. Since Apache Spark has 
checksum files already, we can take advantage of it in order to improve the 
robustness by preventing any potential remounted disk issues.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with newly added test suite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42724 from dongjoon-hyun/SPARK-44915.
    
    Lead-authored-by: Dongjoon Hyun <dh...@apple.com>
    Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/shuffle/ShuffleChecksumUtils.scala       |  13 ++
 ...ernetesLocalDiskShuffleExecutorComponents.scala |  69 +++++++++--
 .../spark/shuffle/ShuffleChecksumUtilsSuite.scala  | 134 +++++++++++++++++++++
 3 files changed, 209 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
index 75b0efcf5cd..b2a18d75387 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
@@ -22,9 +22,22 @@ import java.util.zip.CheckedInputStream
 
 import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
 import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.storage.{BlockId, ShuffleChecksumBlockId, 
ShuffleDataBlockId}
 
 object ShuffleChecksumUtils {
 
+  /**
+   * Return checksumFile for shuffle data block ID. Otherwise, null.
+   */
+  def getChecksumFileName(blockId: BlockId, algorithm: String): String = 
blockId match {
+    case ShuffleDataBlockId(shuffleId, mapId, _) =>
+      ShuffleChecksumHelper.getChecksumFileName(
+        ShuffleChecksumBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name, 
algorithm)
+    case _ =>
+      null
+  }
+
   /**
    * Ensure that the checksum values are consistent with index file and data 
file.
    */
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
index e553a56b7e1..a858db374df 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle
 import java.io.File
 import java.util.Optional
 
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import org.apache.commons.io.FileExistsException
@@ -27,9 +28,11 @@ import org.apache.commons.io.FileExistsException
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, 
SHUFFLE_CHECKSUM_ENABLED}
+import org.apache.spark.shuffle.ShuffleChecksumUtils.{compareChecksums, 
getChecksumFileName}
 import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, 
ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter}
 import org.apache.spark.shuffle.sort.io.LocalDiskShuffleExecutorComponents
-import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, 
UnrecognizedBlockId}
+import org.apache.spark.storage.{BlockId, BlockManager, ShuffleDataBlockId, 
StorageLevel, UnrecognizedBlockId}
 import org.apache.spark.util.Utils
 
 class KubernetesLocalDiskShuffleExecutorComponents(sparkConf: SparkConf)
@@ -73,7 +76,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
    */
   def recoverDiskStore(conf: SparkConf, bm: BlockManager): Unit = {
     // Find All files
-    val files = Utils.getConfiguredLocalDirs(conf)
+    val (checksumFiles, files) = Utils.getConfiguredLocalDirs(conf)
       .filter(_ != null)
       .map(s => new File(new File(new File(s).getParent).getParent))
       .flatMap { dir =>
@@ -84,24 +87,49 @@ object KubernetesLocalDiskShuffleExecutorComponents extends 
Logging {
           .flatMap(_.listFiles).filter(_.isDirectory) // executor-xxx
           .flatMap(_.listFiles).filter(_.isDirectory) // blockmgr-xxx
           .flatMap(_.listFiles).filter(_.isDirectory) // 00
-          .flatMap(_.listFiles).filterNot(_.getName.contains(".checksum"))
+          .flatMap(_.listFiles)
         if (files != null) files.toSeq else Seq.empty
       }
+      .partition(_.getName.contains(".checksum"))
+    val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
 
-    logInfo(s"Found ${files.size} files")
+    logInfo(s"Found ${dataFiles.size} data files, ${indexFiles.size} index 
files, " +
+        s"and ${checksumFiles.size} checksum files.")
+
+    // Build a hashmap with checksum file name as a key
+    val checksumFileMap = new mutable.HashMap[String, File]()
+    val algorithm = conf.get(SHUFFLE_CHECKSUM_ALGORITHM)
+    checksumFiles.foreach { f =>
+      logInfo(s"${f.getName} -> ${f.getAbsolutePath}")
+      checksumFileMap.put(f.getName, f)
+    }
+    // Build a hashmap with shuffle data file name as a key
+    val indexFileMap = new mutable.HashMap[String, File]()
+    indexFiles.foreach { f =>
+      logInfo(s"${f.getName.replace(".index", ".data")} -> 
${f.getAbsolutePath}")
+      indexFileMap.put(f.getName.replace(".index", ".data"), f)
+    }
 
     // This is not used.
     val classTag = implicitly[ClassTag[Object]]
     val level = StorageLevel.DISK_ONLY
-    val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index"))
+    val checksumDisabled = !conf.get(SHUFFLE_CHECKSUM_ENABLED)
     (dataFiles ++ indexFiles).foreach { f =>
       logInfo(s"Try to recover ${f.getAbsolutePath}")
       try {
         val id = BlockId(f.getName)
         // To make it sure to handle only shuffle blocks
         if (id.isShuffle) {
-          val decryptedSize = f.length()
-          bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
+          // For index files, skipVerification is true and checksumFile and 
indexFile are ignored.
+          val skipVerification = checksumDisabled || 
f.getName.endsWith(".index")
+          val checksumFile = checksumFileMap.getOrElse(getChecksumFileName(id, 
algorithm), null)
+          val indexFile = indexFileMap.getOrElse(f.getName, null)
+          if (skipVerification || verifyChecksum(algorithm, id, checksumFile, 
indexFile, f)) {
+            val decryptedSize = f.length()
+            bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, 
decryptedSize).save()
+          } else {
+            logInfo(s"Ignore ${f.getAbsolutePath} due to the verification 
failure.")
+          }
         } else {
           logInfo("Ignore a non-shuffle block file.")
         }
@@ -114,5 +142,32 @@ object KubernetesLocalDiskShuffleExecutorComponents 
extends Logging {
       }
     }
   }
+
+  def verifyChecksum(
+      algorithm: String,
+      blockId: BlockId,
+      checksumFile: File,
+      indexFile: File,
+      dataFile: File): Boolean = {
+    blockId match {
+      case _: ShuffleDataBlockId =>
+        if (dataFile == null || !dataFile.exists()) {
+          false // Fail because the data file doesn't exist.
+        } else if (checksumFile == null || !checksumFile.exists()) {
+          true // Pass if the checksum file doesn't exist.
+        } else if (checksumFile.length() == 0 || checksumFile.length() % 8 != 
0) {
+          false // Fail because the checksum file is corrupted.
+        } else if (indexFile == null || !indexFile.exists()) {
+          false // Fail because the index file is missing.
+        } else if (indexFile.length() == 0) {
+          false // Fail because the index file is empty.
+        } else {
+          val numPartition = (checksumFile.length() / 8).toInt
+          compareChecksums(numPartition, algorithm, checksumFile, dataFile, 
indexFile)
+        }
+      case _ =>
+        true // Ignore if blockId is not a shuffle data block.
+    }
+  }
 }
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala
new file mode 100644
index 00000000000..e24d0db1d8e
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumUtilsSuite.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.{DataOutputStream, File, FileOutputStream}
+
+import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper.getChecksumFileName
+import 
org.apache.spark.shuffle.KubernetesLocalDiskShuffleExecutorComponents.verifyChecksum
+import org.apache.spark.storage.{ShuffleChecksumBlockId, ShuffleDataBlockId, 
ShuffleIndexBlockId}
+
+
+class ShuffleChecksumUtilsSuite extends SparkFunSuite {
+
+  private val ALGORITHM = "ADLER32"
+  private val NON_EXIST_FILE = new File("non-exist-data-file")
+  private val dataBlockId = ShuffleDataBlockId(0, 0, 0)
+  private val indexBlockId = ShuffleIndexBlockId(0, 0, 0)
+  private val checksumBlockId = ShuffleChecksumBlockId(0, 0, 0)
+
+  test("verifyChecksum fails if dataFile doesnt' exist") {
+    assert(!verifyChecksum(ALGORITHM, dataBlockId, null, null, null))
+    assert(!verifyChecksum(ALGORITHM, dataBlockId, null, null, NON_EXIST_FILE))
+  }
+
+  test("verifyChecksum ignores shuffle index files") {
+    withTempDir { dir =>
+      val dataFile = new File(dir, dataBlockId.name)
+      dataFile.createNewFile()
+      assert(verifyChecksum(ALGORITHM, indexBlockId, null, null, dataFile))
+    }
+  }
+
+  test("verifyChecksum succeeds if a checksum file doesn't exist") {
+    withTempDir { dir =>
+      val dataFile = new File(dir, dataBlockId.name)
+      dataFile.createNewFile()
+      assert(verifyChecksum(ALGORITHM, dataBlockId, null, null, dataFile))
+      assert(verifyChecksum(ALGORITHM, dataBlockId, NON_EXIST_FILE, null, 
dataFile))
+    }
+  }
+
+  test("verifyChecksum fails if a checksum file is empty or corrupted") {
+    withTempDir { dir =>
+      val dataFile = new File(dir, dataBlockId.name)
+      dataFile.createNewFile()
+
+      val checksumFileName = getChecksumFileName(checksumBlockId.name, 
ALGORITHM)
+      val checksumFile = new File(dir, checksumFileName)
+      checksumFile.createNewFile()
+      assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, null, 
dataFile))
+
+      val out = new DataOutputStream(new FileOutputStream(checksumFile))
+      out.writeInt(1)
+      out.close()
+      assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, null, 
dataFile))
+    }
+  }
+
+  test("verifyChecksum fails if an index file is missing.") {
+    withTempDir { dir =>
+      val dataFile = new File(dir, dataBlockId.name)
+      dataFile.createNewFile()
+
+      val checksumFileName = getChecksumFileName(checksumBlockId.name, 
ALGORITHM)
+      val checksumFile = new File(dir, checksumFileName)
+      checksumFile.createNewFile()
+      val out = new DataOutputStream(new FileOutputStream(checksumFile))
+      out.writeLong(0)
+      out.close()
+
+      assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, null, 
dataFile))
+      assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, 
NON_EXIST_FILE, dataFile))
+    }
+  }
+
+  test("verifyChecksum fails if an index file is empty.") {
+    withTempDir { dir =>
+      val dataFile = new File(dir, dataBlockId.name)
+      dataFile.createNewFile()
+
+      val checksumFileName = getChecksumFileName(checksumBlockId.name, 
ALGORITHM)
+      val checksumFile = new File(dir, checksumFileName)
+      checksumFile.createNewFile()
+      val out = new DataOutputStream(new FileOutputStream(checksumFile))
+      out.writeLong(0)
+      out.close()
+
+      val indexFile = new File(dir, indexBlockId.name)
+      indexFile.createNewFile()
+
+      assert(!verifyChecksum(ALGORITHM, dataBlockId, checksumFile, indexFile, 
dataFile))
+    }
+  }
+
+  test("verifyChecksum succeeds") {
+    withTempDir { dir =>
+      val indexFile = new File(dir, indexBlockId.name)
+      val dataFile = new File(dir, dataBlockId.name)
+      val checksumFileName = getChecksumFileName(checksumBlockId.name, 
ALGORITHM)
+      val checksumFile = new File(dir, checksumFileName)
+
+      indexFile.createNewFile()
+      dataFile.createNewFile()
+      checksumFile.createNewFile()
+
+      val dos = new DataOutputStream(new FileOutputStream(indexFile))
+      dos.writeLong(0) // previous offset
+      dos.writeLong(0) // current offset
+      dos.close()
+
+      val out = new DataOutputStream(new FileOutputStream(checksumFile))
+      out.writeLong(1) // Checksum for empty data file
+      out.close()
+
+      assert(verifyChecksum(ALGORITHM, dataBlockId, checksumFile, indexFile, 
dataFile))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to