This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5db58f92538 [SPARK-44993][CORE] Add `ShuffleChecksumUtils.compareChecksums` by reusing `ShuffleChecksumTestHelp.compareChecksums` 5db58f92538 is described below commit 5db58f92538d2cf2fee90a5ca08c07c4e2242aad Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Mon Aug 28 17:29:33 2023 -0700 [SPARK-44993][CORE] Add `ShuffleChecksumUtils.compareChecksums` by reusing `ShuffleChecksumTestHelp.compareChecksums` ### What changes were proposed in this pull request? This PR aims to add `ShuffleChecksumUtils.compareChecksums` by reusing the existing test code `ShuffleChecksumTestHelp.compareChecksums` in order to reuse the functionality in the main code. ### Why are the changes needed? This is very useful in the test code. We can take advantage of this verification logic in `core` module. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the existing test codes because this is a kind of refactoring. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42707 from dongjoon-hyun/SPARK-44993. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/shuffle/ShuffleChecksumUtils.scala} | 13 +++--- .../spark/shuffle/ShuffleChecksumTestHelper.scala | 49 ++-------------------- 2 files changed, 8 insertions(+), 54 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala similarity index 87% copy from core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala copy to core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala index 3db2f77fe15..75b0efcf5cd 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleChecksumUtils.scala @@ -23,21 +23,17 @@ import java.util.zip.CheckedInputStream import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper import org.apache.spark.network.util.LimitedInputStream -trait ShuffleChecksumTestHelper { +object ShuffleChecksumUtils { /** - * Ensure that the checksum values are consistent between write and read side. + * Ensure that the checksum values are consistent with index file and data file. */ def compareChecksums( numPartition: Int, algorithm: String, checksum: File, data: File, - index: File): Unit = { - assert(checksum.exists(), "Checksum file doesn't exist") - assert(data.exists(), "Data file doesn't exist") - assert(index.exists(), "Index file doesn't exist") - + index: File): Boolean = { var checksumIn: DataInputStream = null val expectChecksums = Array.ofDim[Long](numPartition) try { @@ -66,7 +62,7 @@ trait ShuffleChecksumTestHelper { checkedIn.read(bytes, 0, limit) prevOffset = curOffset // checksum must be consistent at both write and read sides - assert(checkedIn.getChecksum.getValue == expectChecksums(i)) + if (checkedIn.getChecksum.getValue != expectChecksums(i)) return false } } finally { if (dataIn != null) { @@ -79,5 +75,6 @@ trait ShuffleChecksumTestHelper { checkedIn.close() } } + true } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala index 3db2f77fe15..8be103b7be8 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala @@ -17,11 +17,7 @@ package org.apache.spark.shuffle -import java.io.{DataInputStream, File, FileInputStream} -import java.util.zip.CheckedInputStream - -import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper -import org.apache.spark.network.util.LimitedInputStream +import java.io.File trait ShuffleChecksumTestHelper { @@ -38,46 +34,7 @@ trait ShuffleChecksumTestHelper { assert(data.exists(), "Data file doesn't exist") assert(index.exists(), "Index file doesn't exist") - var checksumIn: DataInputStream = null - val expectChecksums = Array.ofDim[Long](numPartition) - try { - checksumIn = new DataInputStream(new FileInputStream(checksum)) - (0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong()) - } finally { - if (checksumIn != null) { - checksumIn.close() - } - } - - var dataIn: FileInputStream = null - var indexIn: DataInputStream = null - var checkedIn: CheckedInputStream = null - try { - dataIn = new FileInputStream(data) - indexIn = new DataInputStream(new FileInputStream(index)) - var prevOffset = indexIn.readLong - (0 until numPartition).foreach { i => - val curOffset = indexIn.readLong - val limit = (curOffset - prevOffset).toInt - val bytes = new Array[Byte](limit) - val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm) - checkedIn = new CheckedInputStream( - new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal) - checkedIn.read(bytes, 0, limit) - prevOffset = curOffset - // checksum must be consistent at both write and read sides - assert(checkedIn.getChecksum.getValue == expectChecksums(i)) - } - } finally { - if (dataIn != null) { - dataIn.close() - } - if (indexIn != null) { - indexIn.close() - } - if (checkedIn != null) { - checkedIn.close() - } - } + assert(ShuffleChecksumUtils.compareChecksums(numPartition, algorithm, checksum, data, index), + "checksum must be consistent at both write and read sides") } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org