cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664290337



##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##########
@@ -133,6 +145,23 @@
     this.peakMemoryUsedBytes = getMemoryUsage();
     this.diskWriteBufferSize =
         (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+    this.checksumEnabled = 
ShuffleChecksumHelper.isShuffleChecksumEnabled(conf);
+    if (this.checksumEnabled) {
+      this.partitionChecksums = 
ShuffleChecksumHelper.createPartitionChecksums(numPartitions, conf);
+    }
+  }
+
+  public long[] getChecksums() {

Review comment:
       we can move this to the helper.

##########
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##########
@@ -107,20 +112,27 @@
   @Nullable private MemoryBlock currentPage = null;
   private long pageCursor = -1;
 
+  private final boolean checksumEnabled;
+  private Checksum[] partitionChecksums;
+
   ShuffleExternalSorter(
       TaskMemoryManager memoryManager,
       BlockManager blockManager,
       TaskContext taskContext,
       int initialSize,
+      int shuffleId,
+      long mapId,
       int numPartitions,
       SparkConf conf,
-      ShuffleWriteMetricsReporter writeMetrics) {
+      ShuffleWriteMetricsReporter writeMetrics) throws SparkException {
     super(memoryManager,
       (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, 
memoryManager.pageSizeBytes()),
       memoryManager.getTungstenMemoryMode());
     this.taskMemoryManager = memoryManager;
     this.blockManager = blockManager;
     this.taskContext = taskContext;
+    this.shuffleId = shuffleId;
+    this.mapId = mapId;

Review comment:
       these 2 are not used.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1368,6 +1368,24 @@ package object config {
         s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
       .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM_ENABLED =
+    ConfigBuilder("spark.shuffle.checksum.enabled")
+      .doc("Whether to calculate the checksum of shuffle output. If enabled, 
Spark will try " +
+        "its best to tell if shuffle data corruption is caused by network or 
disk or others.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val SHUFFLE_CHECKSUM_ALGORITHM =
+    ConfigBuilder("spark.shuffle.checksum.algorithm")
+      .doc("The algorithm used to calculate the checksum. Currently, it only 
supports" +
+        " built-in algorithms of JDK.")
+      .version("3.3.0")
+      .stringConf
+      .checkValue(Set("Adler32", "CRC32").contains, "Shuffle checksum 
algorithm " +

Review comment:
       this should be case insensitive.

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
##########
@@ -45,6 +47,23 @@ private[spark] class ShufflePartitionPairsWriter(
   private var numRecordsWritten = 0
   private var curNumBytesWritten = 0L
 
+  // checksum related
+  private var checksumEnabled = false
+  private var checksumOutputStream: MutableCheckedOutputStream = _
+  private var checksum: Checksum = _
+
+  /**
+   * Set the checksum that the checksumOutputStream should use
+   */
+  def setChecksum(checksum: Checksum): Unit = {
+    if (checksumOutputStream == null) {
+      this.checksumEnabled = true
+      this.checksum = checksum

Review comment:
       We can set this in constructor

##########
File path: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
##########
@@ -763,6 +775,7 @@ private[spark] class ExternalSorter[K, V, C](
             serInstance,
             blockId,
             context.taskMetrics().shuffleWriteMetrics)
+          partitionPairsWriter.setChecksum(partitionChecksums(partitionId))

Review comment:
       we need to check if checksum is enabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to