[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-14 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r670100864



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver(
   // Another attempt for the same task has already written our map 
outputs successfully,
   // so just use the existing partition lengths and delete our 
temporary map outputs.
   System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+  if (checksumEnabled) {
+val existingChecksums = getChecksums(checksumFileOpt.get, 
checksums.length)
+if (existingChecksums != null) {
+  System.arraycopy(existingChecksums, 0, checksums, 0, 
lengths.length)
+} else {
+  // It's possible that the previous task attempt succeeded 
writing the
+  // index file and data file but failed to write the checksum 
file. In
+  // this case, the current task attempt could write the missing 
checksum
+  // file by itself.
+  writeMetadataFile(checksums, checksumTmpOpt.get, 
checksumFileOpt.get, false)
+}
+  }
   if (dataTmp != null && dataTmp.exists()) {
 dataTmp.delete()
   }
 } else {
   // This is the first successful attempt in writing the map outputs 
for this task,
   // so override any existing index and data files with the ones we 
wrote.
-  val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
-  Utils.tryWithSafeFinally {
-// We take in lengths of each block, need to convert it to offsets.
-var offset = 0L
-out.writeLong(offset)
-for (length <- lengths) {
-  offset += length
-  out.writeLong(offset)
-}
-  } {
-out.close()
-  }
 
-  if (indexFile.exists()) {
-indexFile.delete()
-  }
+  val offsets = lengths.scanLeft(0L)(_ + _)
+  writeMetadataFile(offsets, indexTmp, indexFile, true)
+
   if (dataFile.exists()) {
 dataFile.delete()
   }
-  if (!indexTmp.renameTo(indexFile)) {
-throw new IOException("fail to rename file " + indexTmp + " to " + 
indexFile)
-  }
   if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
 throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
   }
+
+  // write the checksum file
+  checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, 
checksumFile) =>
+try {
+  writeMetadataFile(checksums, checksumTmp, checksumFile, false)
+} catch {
+  case e: Exception =>
+// It's not worthwhile to fail here after index file and data 
file are
+// already successfully stored since checksum is only a 
best-effort for
+// the corner error case.
+logError("Failed to write checksum file", e)
+}
+  }
 }
   }
 } finally {
   logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", 
"]")}")
   if (indexTmp.exists() && !indexTmp.delete()) {
 logError(s"Failed to delete temporary index file at 
${indexTmp.getAbsolutePath}")
   }
+  checksumTmpOpt.foreach { checksumTmp =>
+if (checksumTmp.exists()) {
+  try {
+if (!checksumTmp.delete()) {
+  logError(s"Failed to delete temporary checksum file " +
+s"at ${checksumTmp.getAbsolutePath}")
+}
+  } catch {
+case e: Exception =>
+  // Unlike index deletion, we won't propagate the error for the 
checksum file since
+  // checksum is only a best-effort.
+  logError(s"Failed to delete temporary checksum file " +
+s"at ${checksumTmp.getAbsolutePath}", e)
+  }
+}
+  }
+}
+  }
+
+  /**
+   * Write the metadata file (index or checksum). Metadata values will be 
firstly write into
+   * the tmp file and the tmp file will be renamed to the target file at the 
end to avoid dirty
+   * writes.
+   * @param metaValues The metadata values
+   * @param tmpFile The temp file
+   * @param targetFile The target file
+   * @param propagateError Whether to propagate the error for file operation. 
Unlike index file,
+   *   checksum is only a best-effort so we won't fail the 
whole task due to
+   *   the error from checksum.
+   */
+  private def writeMetadataFile(
+  metaValues: Array[Long],
+  tmpFile: File,
+  targetFile: File,
+  propagateError: Boolean): Unit = {
+val out = new DataOutputStream(
+  new Buffe

[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-14 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r670100864



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver(
   // Another attempt for the same task has already written our map 
outputs successfully,
   // so just use the existing partition lengths and delete our 
temporary map outputs.
   System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+  if (checksumEnabled) {
+val existingChecksums = getChecksums(checksumFileOpt.get, 
checksums.length)
+if (existingChecksums != null) {
+  System.arraycopy(existingChecksums, 0, checksums, 0, 
lengths.length)
+} else {
+  // It's possible that the previous task attempt succeeded 
writing the
+  // index file and data file but failed to write the checksum 
file. In
+  // this case, the current task attempt could write the missing 
checksum
+  // file by itself.
+  writeMetadataFile(checksums, checksumTmpOpt.get, 
checksumFileOpt.get, false)
+}
+  }
   if (dataTmp != null && dataTmp.exists()) {
 dataTmp.delete()
   }
 } else {
   // This is the first successful attempt in writing the map outputs 
for this task,
   // so override any existing index and data files with the ones we 
wrote.
-  val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
-  Utils.tryWithSafeFinally {
-// We take in lengths of each block, need to convert it to offsets.
-var offset = 0L
-out.writeLong(offset)
-for (length <- lengths) {
-  offset += length
-  out.writeLong(offset)
-}
-  } {
-out.close()
-  }
 
-  if (indexFile.exists()) {
-indexFile.delete()
-  }
+  val offsets = lengths.scanLeft(0L)(_ + _)
+  writeMetadataFile(offsets, indexTmp, indexFile, true)
+
   if (dataFile.exists()) {
 dataFile.delete()
   }
-  if (!indexTmp.renameTo(indexFile)) {
-throw new IOException("fail to rename file " + indexTmp + " to " + 
indexFile)
-  }
   if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
 throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
   }
+
+  // write the checksum file
+  checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, 
checksumFile) =>
+try {
+  writeMetadataFile(checksums, checksumTmp, checksumFile, false)
+} catch {
+  case e: Exception =>
+// It's not worthwhile to fail here after index file and data 
file are
+// already successfully stored since checksum is only a 
best-effort for
+// the corner error case.
+logError("Failed to write checksum file", e)
+}
+  }
 }
   }
 } finally {
   logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", 
"]")}")
   if (indexTmp.exists() && !indexTmp.delete()) {
 logError(s"Failed to delete temporary index file at 
${indexTmp.getAbsolutePath}")
   }
+  checksumTmpOpt.foreach { checksumTmp =>
+if (checksumTmp.exists()) {
+  try {
+if (!checksumTmp.delete()) {
+  logError(s"Failed to delete temporary checksum file " +
+s"at ${checksumTmp.getAbsolutePath}")
+}
+  } catch {
+case e: Exception =>
+  // Unlike index deletion, we won't propagate the error for the 
checksum file since
+  // checksum is only a best-effort.
+  logError(s"Failed to delete temporary checksum file " +
+s"at ${checksumTmp.getAbsolutePath}", e)
+  }
+}
+  }
+}
+  }
+
+  /**
+   * Write the metadata file (index or checksum). Metadata values will be 
firstly write into
+   * the tmp file and the tmp file will be renamed to the target file at the 
end to avoid dirty
+   * writes.
+   * @param metaValues The metadata values
+   * @param tmpFile The temp file
+   * @param targetFile The target file
+   * @param propagateError Whether to propagate the error for file operation. 
Unlike index file,
+   *   checksum is only a best-effort so we won't fail the 
whole task due to
+   *   the error from checksum.
+   */
+  private def writeMetadataFile(
+  metaValues: Array[Long],
+  tmpFile: File,
+  targetFile: File,
+  propagateError: Boolean): Unit = {
+val out = new DataOutputStream(
+  new Buffe

[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-13 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r669319636



##
File path: 
core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java
##
@@ -0,0 +1,83 @@
+package org.apache.spark.shuffle.checksum;

Review comment:
   Oh I see. I was caught by the `FileNotFoundException` previously.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-13 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r669282673



##
File path: 
core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java
##
@@ -0,0 +1,83 @@
+package org.apache.spark.shuffle.checksum;

Review comment:
   oh yeah, this's definately a mistake. Though, it doesn't seem to be 
related to `RAT` failure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-13 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r668538313



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -329,44 +352,111 @@ private[spark] class IndexShuffleBlockResolver(
   // Another attempt for the same task has already written our map 
outputs successfully,
   // so just use the existing partition lengths and delete our 
temporary map outputs.
   System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
+  if (checksumEnabled) {
+val existingChecksums = getChecksums(checksumFileOpt.get, 
checksums.length)
+if (existingChecksums != null) {
+  System.arraycopy(existingChecksums, 0, checksums, 0, 
lengths.length)
+} else {
+  // It's possible that the previous task attempt succeeded 
writing the
+  // index file and data file but failed to write the checksum 
file. In
+  // this case, the current task attempt could write the missing 
checksum
+  // file by itself.
+  writeMetadataFile(checksums, checksumTmpOpt.get, 
checksumFileOpt.get, false)

Review comment:
   I did a bit more refactor here. Please take another look, thanks 
@mridulm 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-12 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r668409304



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -360,13 +389,41 @@ private[spark] class IndexShuffleBlockResolver(
   if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
 throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
   }
+
+  // write the checksum file
+  checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, 
checksumFile) =>
+val out = new DataOutputStream(
+  new BufferedOutputStream(
+new FileOutputStream(checksumTmp)
+  )
+)
+Utils.tryWithSafeFinally {
+  checksums.foreach(out.writeLong)
+} {
+  out.close()
+}
+
+if (checksumFile.exists()) {
+  checksumFile.delete()
+}
+if (!checksumTmp.renameTo(checksumFile)) {
+  // It's not worthwhile to fail here after index file and data 
file are already
+  // successfully stored due to checksum is only used for the 
corner error case.
+  logWarning("fail to rename file " + checksumTmp + " to " + 
checksumFile)
+}
+  }
 }
   }
 } finally {
   logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", 
"]")}")
   if (indexTmp.exists() && !indexTmp.delete()) {
 logError(s"Failed to delete temporary index file at 
${indexTmp.getAbsolutePath}")
   }
+  checksumTmpOpt.foreach { checksumTmp =>
+if (checksumTmp.exists() && !checksumTmp.delete()) {

Review comment:
   Good point! We won't propagate the error. I'll handle it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-12 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r668408590



##
File path: 
core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java
##
@@ -0,0 +1,81 @@
+package org.apache.spark.shuffle.checksum;
+
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.storage.ShuffleChecksumBlockId;
+
+public class ShuffleChecksumHelper {

Review comment:
   Added the doc. And marked it as private.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-12 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r668408329



##
File path: 
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
##
@@ -101,12 +108,30 @@ private[spark] class DiskBlockObjectWriter(
*/
   private var numRecordsWritten = 0
 
+  /**
+   * Set the checksum that the checksumOutputStream should use
+   */
+  def setChecksum(checksum: Checksum): Unit = {
+if (checksumOutputStream == null) {
+  this.checksumEnabled = true
+  this.checksum = checksum
+} else {
+  checksumOutputStream.setChecksum(checksum)

Review comment:
   Yes, it's intentional. In the case of `ShuffleExternalSorter` spill, one 
`DiskBlockObjectWriter` would serve multiple partitions and different 
partitions should use different checksums.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-12 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r668400943



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -360,13 +389,41 @@ private[spark] class IndexShuffleBlockResolver(
   if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
 throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
   }
+
+  // write the checksum file
+  checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, 
checksumFile) =>
+val out = new DataOutputStream(
+  new BufferedOutputStream(
+new FileOutputStream(checksumTmp)
+  )
+)
+Utils.tryWithSafeFinally {
+  checksums.foreach(out.writeLong)
+} {
+  out.close()
+}
+
+if (checksumFile.exists()) {
+  checksumFile.delete()
+}
+if (!checksumTmp.renameTo(checksumFile)) {
+  // It's not worthwhile to fail here after index file and data 
file are already
+  // successfully stored due to checksum is only used for the 
corner error case.
+  logWarning("fail to rename file " + checksumTmp + " to " + 
checksumFile)

Review comment:
   I see. I got your point. I'd prefer to back to `if (existingLengths != 
null) {`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-12 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r668027402



##
File path: 
core/src/main/java/org/apache/spark/shuffle/checksum/ShuffleChecksumHelper.java
##
@@ -0,0 +1,66 @@
+package org.apache.spark.shuffle.checksum;
+
+import java.util.Locale;
+import java.util.zip.Adler32;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.storage.ShuffleChecksumBlockId;
+
+public class ShuffleChecksumHelper {
+
+  public static boolean isShuffleChecksumEnabled(SparkConf conf) {
+return (boolean) conf.get(package$.MODULE$.SHUFFLE_CHECKSUM_ENABLED());
+  }
+
+  public static Checksum[] createPartitionChecksumsIfEnabled(int 
numPartitions, SparkConf conf)
+throws SparkException {
+Checksum[] partitionChecksums;
+
+if (!isShuffleChecksumEnabled(conf)) {
+  partitionChecksums = new Checksum[0];
+  return partitionChecksums;
+}
+
+String checksumAlgo = 
shuffleChecksumAlgorithm(conf).toLowerCase(Locale.ROOT);
+switch (checksumAlgo) {
+  case "adler32":
+partitionChecksums = new Adler32[numPartitions];
+for (int i = 0; i < numPartitions; i ++) {
+  partitionChecksums[i] = new Adler32();
+}
+return partitionChecksums;
+
+  case "crc32":
+partitionChecksums = new CRC32[numPartitions];
+for (int i = 0; i < numPartitions; i ++) {
+  partitionChecksums[i] = new CRC32();
+}
+return partitionChecksums;
+
+  default:
+throw new SparkException("Unsupported shuffle checksum algorithm: " + 
checksumAlgo);
+}
+  }
+
+  public static long[] getChecksumValues(Checksum[] partitionChecksums) {

Review comment:
   it already returns the empty long array when `partitionChecksums` is 
empty? (it won't go through the for loop when `partitionChecksums` is empty.)
   
   

##
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##
@@ -360,13 +389,41 @@ private[spark] class IndexShuffleBlockResolver(
   if (dataTmp != null && dataTmp.exists() && 
!dataTmp.renameTo(dataFile)) {
 throw new IOException("fail to rename file " + dataTmp + " to " + 
dataFile)
   }
+
+  // write the checksum file
+  checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, 
checksumFile) =>
+val out = new DataOutputStream(
+  new BufferedOutputStream(
+new FileOutputStream(checksumTmp)
+  )
+)
+Utils.tryWithSafeFinally {
+  checksums.foreach(out.writeLong)
+} {
+  out.close()
+}
+
+if (checksumFile.exists()) {
+  checksumFile.delete()
+}
+if (!checksumTmp.renameTo(checksumFile)) {
+  // It's not worthwhile to fail here after index file and data 
file are already
+  // successfully stored due to checksum is only used for the 
corner error case.
+  logWarning("fail to rename file " + checksumTmp + " to " + 
checksumFile)

Review comment:
   So if you look at the comment, my concern is: it's not worthwhile to 
fail here after the index file and data file are
   successfully generated since the checksum is only a best-effort could do in 
case of data corruption. 
   
   WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r663798968



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -133,6 +144,26 @@
 this.peakMemoryUsedBytes = getMemoryUsage();
 this.diskWriteBufferSize =
 (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+this.checksumEnabled = (boolean) 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM());
+if (this.checksumEnabled) {
+  this.partitionChecksums = new Adler32[numPartitions];

Review comment:
   Added `ShuffleChecksumHelper`

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1368,6 +1368,14 @@ package object config {
 s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
   .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM =
+ConfigBuilder("spark.shuffle.checksum")

Review comment:
   Appened `.enabled` and added `spark.shuffle.checksum.algorithm` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-05 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r663799506



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1368,6 +1368,14 @@ package object config {
 s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
   .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM =
+ConfigBuilder("spark.shuffle.checksum")

Review comment:
   Appened `.enabled` and added `spark.shuffle.checksum.algorithm` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-05 Thread GitBox


Ngone51 commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r663798968



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -133,6 +144,26 @@
 this.peakMemoryUsedBytes = getMemoryUsage();
 this.diskWriteBufferSize =
 (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+this.checksumEnabled = (boolean) 
conf.get(package$.MODULE$.SHUFFLE_CHECKSUM());
+if (this.checksumEnabled) {
+  this.partitionChecksums = new Adler32[numPartitions];

Review comment:
   Added `ShuffleChecksumHelper`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org