This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db58f92538 [SPARK-44993][CORE] Add 
`ShuffleChecksumUtils.compareChecksums` by reusing 
`ShuffleChecksumTestHelp.compareChecksums`
5db58f92538 is described below

commit 5db58f92538d2cf2fee90a5ca08c07c4e2242aad
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Aug 28 17:29:33 2023 -0700

    [SPARK-44993][CORE] Add `ShuffleChecksumUtils.compareChecksums` by reusing 
`ShuffleChecksumTestHelp.compareChecksums`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `ShuffleChecksumUtils.compareChecksums` by reusing the 
existing test code `ShuffleChecksumTestHelp.compareChecksums` in order to reuse 
the functionality in the main code.
    
    ### Why are the changes needed?
    
    This is very useful in the test code. We can take advantage of this 
verification logic in `core` module.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the existing test codes because this is a kind of 
refactoring.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42707 from dongjoon-hyun/SPARK-44993.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/shuffle/ShuffleChecksumUtils.scala}      | 13 +++---
 .../spark/shuffle/ShuffleChecksumTestHelper.scala  | 49 ++--------------------
 2 files changed, 8 insertions(+), 54 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
similarity index 87%
copy from 
core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
copy to core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
index 3db2f77fe15..75b0efcf5cd 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala
@@ -23,21 +23,17 @@ import java.util.zip.CheckedInputStream
 import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
 import org.apache.spark.network.util.LimitedInputStream
 
-trait ShuffleChecksumTestHelper {
+object ShuffleChecksumUtils {
 
   /**
-   * Ensure that the checksum values are consistent between write and read 
side.
+   * Ensure that the checksum values are consistent with index file and data 
file.
    */
   def compareChecksums(
       numPartition: Int,
       algorithm: String,
       checksum: File,
       data: File,
-      index: File): Unit = {
-    assert(checksum.exists(), "Checksum file doesn't exist")
-    assert(data.exists(), "Data file doesn't exist")
-    assert(index.exists(), "Index file doesn't exist")
-
+      index: File): Boolean = {
     var checksumIn: DataInputStream = null
     val expectChecksums = Array.ofDim[Long](numPartition)
     try {
@@ -66,7 +62,7 @@ trait ShuffleChecksumTestHelper {
         checkedIn.read(bytes, 0, limit)
         prevOffset = curOffset
         // checksum must be consistent at both write and read sides
-        assert(checkedIn.getChecksum.getValue == expectChecksums(i))
+        if (checkedIn.getChecksum.getValue != expectChecksums(i)) return false
       }
     } finally {
       if (dataIn != null) {
@@ -79,5 +75,6 @@ trait ShuffleChecksumTestHelper {
         checkedIn.close()
       }
     }
+    true
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
index 3db2f77fe15..8be103b7be8 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala
@@ -17,11 +17,7 @@
 
 package org.apache.spark.shuffle
 
-import java.io.{DataInputStream, File, FileInputStream}
-import java.util.zip.CheckedInputStream
-
-import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
-import org.apache.spark.network.util.LimitedInputStream
+import java.io.File
 
 trait ShuffleChecksumTestHelper {
 
@@ -38,46 +34,7 @@ trait ShuffleChecksumTestHelper {
     assert(data.exists(), "Data file doesn't exist")
     assert(index.exists(), "Index file doesn't exist")
 
-    var checksumIn: DataInputStream = null
-    val expectChecksums = Array.ofDim[Long](numPartition)
-    try {
-      checksumIn = new DataInputStream(new FileInputStream(checksum))
-      (0 until numPartition).foreach(i => expectChecksums(i) = 
checksumIn.readLong())
-    } finally {
-      if (checksumIn != null) {
-        checksumIn.close()
-      }
-    }
-
-    var dataIn: FileInputStream = null
-    var indexIn: DataInputStream = null
-    var checkedIn: CheckedInputStream = null
-    try {
-      dataIn = new FileInputStream(data)
-      indexIn = new DataInputStream(new FileInputStream(index))
-      var prevOffset = indexIn.readLong
-      (0 until numPartition).foreach { i =>
-        val curOffset = indexIn.readLong
-        val limit = (curOffset - prevOffset).toInt
-        val bytes = new Array[Byte](limit)
-        val checksumCal = 
ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
-        checkedIn = new CheckedInputStream(
-          new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
-        checkedIn.read(bytes, 0, limit)
-        prevOffset = curOffset
-        // checksum must be consistent at both write and read sides
-        assert(checkedIn.getChecksum.getValue == expectChecksums(i))
-      }
-    } finally {
-      if (dataIn != null) {
-        dataIn.close()
-      }
-      if (indexIn != null) {
-        indexIn.close()
-      }
-      if (checkedIn != null) {
-        checkedIn.close()
-      }
-    }
+    assert(ShuffleChecksumUtils.compareChecksums(numPartition, algorithm, 
checksum, data, index),
+      "checksum must be consistent at both write and read sides")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to