[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r670100864 ## File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ## @@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver( // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) + if (checksumEnabled) { +val existingChecksums = getChecksums(checksumFileOpt.get, checksums.length) +if (existingChecksums != null) { + System.arraycopy(existingChecksums, 0, checksums, 0, lengths.length) +} else { + // It's possible that the previous task attempt succeeded writing the + // index file and data file but failed to write the checksum file. In + // this case, the current task attempt could write the missing checksum + // file by itself. + writeMetadataFile(checksums, checksumTmpOpt.get, checksumFileOpt.get, false) +} + } if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } } else { // This is the first successful attempt in writing the map outputs for this task, // so override any existing index and data files with the ones we wrote. - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) - Utils.tryWithSafeFinally { -// We take in lengths of each block, need to convert it to offsets. -var offset = 0L -out.writeLong(offset) -for (length <- lengths) { - offset += length - out.writeLong(offset) -} - } { -out.close() - } - if (indexFile.exists()) { -indexFile.delete() - } + val offsets = lengths.scanLeft(0L)(_ + _) + writeMetadataFile(offsets, indexTmp, indexFile, true) + if (dataFile.exists()) { dataFile.delete() } - if (!indexTmp.renameTo(indexFile)) { -throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) - } if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + + // write the checksum file + checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) => +try { + writeMetadataFile(checksums, checksumTmp, checksumFile, false) +} catch { + case e: Exception => +// It's not worthwhile to fail here after index file and data file are +// already successfully stored since checksum is only a best-effort for +// the corner error case. +logError("Failed to write checksum file", e) +} + } } } } finally { logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}") if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") } + checksumTmpOpt.foreach { checksumTmp => +if (checksumTmp.exists()) { + try { +if (!checksumTmp.delete()) { + logError(s"Failed to delete temporary checksum file " + +s"at ${checksumTmp.getAbsolutePath}") +} + } catch { +case e: Exception => + // Unlike index deletion, we won't propagate the error for the checksum file since + // checksum is only a best-effort. + logError(s"Failed to delete temporary checksum file " + +s"at ${checksumTmp.getAbsolutePath}", e) + } +} + } +} + } + + /** + * Write the metadata file (index or checksum). Metadata values will be firstly write into + * the tmp file and the tmp file will be renamed to the target file at the end to avoid dirty + * writes. + * @param metaValues The metadata values + * @param tmpFile The temp file + * @param targetFile The target file + * @param propagateError Whether to propagate the error for file operation. Unlike index file, + * checksum is only a best-effort so we won't fail the whole task due to + * the error from checksum. + */ + private def writeMetadataFile( + metaValues: Array[Long], + tmpFile: File, + targetFile: File, + propagateError: Boolean): Unit = { +val out = new DataOutputStream( + new Buffe
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r670100864 ## File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ## @@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver( // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) + if (checksumEnabled) { +val existingChecksums = getChecksums(checksumFileOpt.get, checksums.length) +if (existingChecksums != null) { + System.arraycopy(existingChecksums, 0, checksums, 0, lengths.length) +} else { + // It's possible that the previous task attempt succeeded writing the + // index file and data file but failed to write the checksum file. In + // this case, the current task attempt could write the missing checksum + // file by itself. + writeMetadataFile(checksums, checksumTmpOpt.get, checksumFileOpt.get, false) +} + } if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } } else { // This is the first successful attempt in writing the map outputs for this task, // so override any existing index and data files with the ones we wrote. - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) - Utils.tryWithSafeFinally { -// We take in lengths of each block, need to convert it to offsets. -var offset = 0L -out.writeLong(offset) -for (length <- lengths) { - offset += length - out.writeLong(offset) -} - } { -out.close() - } - if (indexFile.exists()) { -indexFile.delete() - } + val offsets = lengths.scanLeft(0L)(_ + _) + writeMetadataFile(offsets, indexTmp, indexFile, true) + if (dataFile.exists()) { dataFile.delete() } - if (!indexTmp.renameTo(indexFile)) { -throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) - } if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + + // write the checksum file + checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) => +try { + writeMetadataFile(checksums, checksumTmp, checksumFile, false) +} catch { + case e: Exception => +// It's not worthwhile to fail here after index file and data file are +// already successfully stored since checksum is only a best-effort for +// the corner error case. +logError("Failed to write checksum file", e) +} + } } } } finally { logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}") if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") } + checksumTmpOpt.foreach { checksumTmp => +if (checksumTmp.exists()) { + try { +if (!checksumTmp.delete()) { + logError(s"Failed to delete temporary checksum file " + +s"at ${checksumTmp.getAbsolutePath}") +} + } catch { +case e: Exception => + // Unlike index deletion, we won't propagate the error for the checksum file since + // checksum is only a best-effort. + logError(s"Failed to delete temporary checksum file " + +s"at ${checksumTmp.getAbsolutePath}", e) + } +} + } +} + } + + /** + * Write the metadata file (index or checksum). Metadata values will be firstly write into + * the tmp file and the tmp file will be renamed to the target file at the end to avoid dirty + * writes. + * @param metaValues The metadata values + * @param tmpFile The temp file + * @param targetFile The target file + * @param propagateError Whether to propagate the error for file operation. Unlike index file, + * checksum is only a best-effort so we won't fail the whole task due to + * the error from checksum. + */ + private def writeMetadataFile( + metaValues: Array[Long], + tmpFile: File, + targetFile: File, + propagateError: Boolean): Unit = { +val out = new DataOutputStream( + new Buffe
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r669319636 ## File path: core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java ## @@ -0,0 +1,83 @@ +package org.apache.spark.shuffle.checksum; Review comment: Oh I see. I was caught by the `FileNotFoundException` previously. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r669282673 ## File path: core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java ## @@ -0,0 +1,83 @@ +package org.apache.spark.shuffle.checksum; Review comment: oh yeah, this's definately a mistake. Though, it doesn't seem to be related to `RAT` failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r668538313 ## File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ## @@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver( // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) + if (checksumEnabled) { +val existingChecksums = getChecksums(checksumFileOpt.get, checksums.length) +if (existingChecksums != null) { + System.arraycopy(existingChecksums, 0, checksums, 0, lengths.length) +} else { + // It's possible that the previous task attempt succeeded writing the + // index file and data file but failed to write the checksum file. In + // this case, the current task attempt could write the missing checksum + // file by itself. + writeMetadataFile(checksums, checksumTmpOpt.get, checksumFileOpt.get, false) Review comment: I did a bit more refactor here. Please take another look, thanks @mridulm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r668409304 ## File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ## @@ -360,13 +389,41 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + + // write the checksum file + checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) => +val out = new DataOutputStream( + new BufferedOutputStream( +new FileOutputStream(checksumTmp) + ) +) +Utils.tryWithSafeFinally { + checksums.foreach(out.writeLong) +} { + out.close() +} + +if (checksumFile.exists()) { + checksumFile.delete() +} +if (!checksumTmp.renameTo(checksumFile)) { + // It's not worthwhile to fail here after index file and data file are already + // successfully stored due to checksum is only used for the corner error case. + logWarning("fail to rename file " + checksumTmp + " to " + checksumFile) +} + } } } } finally { logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}") if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") } + checksumTmpOpt.foreach { checksumTmp => +if (checksumTmp.exists() && !checksumTmp.delete()) { Review comment: Good point! We won't propagate the error. I'll handle it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r668408590 ## File path: core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java ## @@ -0,0 +1,81 @@ +package org.apache.spark.shuffle.checksum; + +import java.util.zip.Adler32; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkException; +import org.apache.spark.internal.config.package$; +import org.apache.spark.storage.ShuffleChecksumBlockId; + +public class ShuffleChecksumHelper { Review comment: Added the doc. And marked it as private. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r668408329 ## File path: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ## @@ -101,12 +108,30 @@ private[spark] class DiskBlockObjectWriter( */ private var numRecordsWritten = 0 + /** + * Set the checksum that the checksumOutputStream should use + */ + def setChecksum(checksum: Checksum): Unit = { +if (checksumOutputStream == null) { + this.checksumEnabled = true + this.checksum = checksum +} else { + checksumOutputStream.setChecksum(checksum) Review comment: Yes, it's intentional. In the case of `ShuffleExternalSorter` spill, one `DiskBlockObjectWriter` would serve multiple partitions and different partitions should use different checksums. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r668400943 ## File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ## @@ -360,13 +389,41 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + + // write the checksum file + checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) => +val out = new DataOutputStream( + new BufferedOutputStream( +new FileOutputStream(checksumTmp) + ) +) +Utils.tryWithSafeFinally { + checksums.foreach(out.writeLong) +} { + out.close() +} + +if (checksumFile.exists()) { + checksumFile.delete() +} +if (!checksumTmp.renameTo(checksumFile)) { + // It's not worthwhile to fail here after index file and data file are already + // successfully stored due to checksum is only used for the corner error case. + logWarning("fail to rename file " + checksumTmp + " to " + checksumFile) Review comment: I see. I got your point. I'd prefer to back to `if (existingLengths != null) {`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r668027402 ## File path: core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java ## @@ -0,0 +1,66 @@ +package org.apache.spark.shuffle.checksum; + +import java.util.Locale; +import java.util.zip.Adler32; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkException; +import org.apache.spark.internal.config.package$; +import org.apache.spark.storage.ShuffleChecksumBlockId; + +public class ShuffleChecksumHelper { + + public static boolean isShuffleChecksumEnabled(SparkConf conf) { +return (boolean) conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ENABLED()); + } + + public static Checksum[] createPartitionChecksumsIfEnabled(int numPartitions, SparkConf conf) +throws SparkException { +Checksum[] partitionChecksums; + +if (!isShuffleChecksumEnabled(conf)) { + partitionChecksums = new Checksum[0]; + return partitionChecksums; +} + +String checksumAlgo = shuffleChecksumAlgorithm(conf).toLowerCase(Locale.ROOT); +switch (checksumAlgo) { + case "adler32": +partitionChecksums = new Adler32[numPartitions]; +for (int i = 0; i < numPartitions; i ++) { + partitionChecksums[i] = new Adler32(); +} +return partitionChecksums; + + case "crc32": +partitionChecksums = new CRC32[numPartitions]; +for (int i = 0; i < numPartitions; i ++) { + partitionChecksums[i] = new CRC32(); +} +return partitionChecksums; + + default: +throw new SparkException("Unsupported shuffle checksum algorithm: " + checksumAlgo); +} + } + + public static long[] getChecksumValues(Checksum[] partitionChecksums) { Review comment: it already returns the empty long array when `partitionChecksums` is empty? (it won't go through the for loop when `partitionChecksums` is empty.) ## File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ## @@ -360,13 +389,41 @@ private[spark] class IndexShuffleBlockResolver( if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } + + // write the checksum file + checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) => +val out = new DataOutputStream( + new BufferedOutputStream( +new FileOutputStream(checksumTmp) + ) +) +Utils.tryWithSafeFinally { + checksums.foreach(out.writeLong) +} { + out.close() +} + +if (checksumFile.exists()) { + checksumFile.delete() +} +if (!checksumTmp.renameTo(checksumFile)) { + // It's not worthwhile to fail here after index file and data file are already + // successfully stored due to checksum is only used for the corner error case. + logWarning("fail to rename file " + checksumTmp + " to " + checksumFile) Review comment: So if you look at the comment, my concern is: it's not worthwhile to fail here after the index file and data file are successfully generated since the checksum is only a best-effort could do in case of data corruption. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r663798968 ## File path: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ## @@ -133,6 +144,26 @@ this.peakMemoryUsedBytes = getMemoryUsage(); this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); +this.checksumEnabled = (boolean) conf.get(package$.MODULE$.SHUFFLE_CHECKSUM()); +if (this.checksumEnabled) { + this.partitionChecksums = new Adler32[numPartitions]; Review comment: Added `ShuffleChecksumHelper` ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1368,6 +1368,14 @@ package object config { s"The buffer size must be greater than 0 and less than or equal to ${Int.MaxValue}.") .createWithDefault(4096) + private[spark] val SHUFFLE_CHECKSUM = +ConfigBuilder("spark.shuffle.checksum") Review comment: Appened `.enabled` and added `spark.shuffle.checksum.algorithm` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r663799506 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1368,6 +1368,14 @@ package object config { s"The buffer size must be greater than 0 and less than or equal to ${Int.MaxValue}.") .createWithDefault(4096) + private[spark] val SHUFFLE_CHECKSUM = +ConfigBuilder("spark.shuffle.checksum") Review comment: Appened `.enabled` and added `spark.shuffle.checksum.algorithm` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file
Ngone51 commented on a change in pull request #32401: URL: https://github.com/apache/spark/pull/32401#discussion_r663798968 ## File path: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ## @@ -133,6 +144,26 @@ this.peakMemoryUsedBytes = getMemoryUsage(); this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE()); +this.checksumEnabled = (boolean) conf.get(package$.MODULE$.SHUFFLE_CHECKSUM()); +if (this.checksumEnabled) { + this.partitionChecksums = new Adler32[numPartitions]; Review comment: Added `ShuffleChecksumHelper` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org