[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-11 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r667672577



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTester.scala
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.{DataInputStream, File, FileInputStream}
+import java.util.zip.{Adler32, CheckedInputStream}
+
+import org.apache.spark.network.util.LimitedInputStream
+
+trait ShuffleChecksumTester {
+
+  /**
+   * Ensure that the checksum values are consistent between write and read 
side.
+   */
+  def compareChecksums(numPartition: Int, checksum: File, data: File, index: 
File): Unit = {
+assert(checksum.exists(), "Checksum file doesn't exist")
+assert(data.exists(), "Data file doesn't exist")
+assert(index.exists(), "Index file doesn't exist")
+
+var checksumIn: DataInputStream = null
+val expectChecksums = Array.ofDim[Long](numPartition)
+try {
+  checksumIn = new DataInputStream(new FileInputStream(checksum))
+  (0 until numPartition).foreach(i => expectChecksums(i) = 
checksumIn.readLong())
+} finally {
+  if (checksumIn != null) {
+checksumIn.close()
+  }
+}
+
+var dataIn: FileInputStream = null
+var indexIn: DataInputStream = null
+var checkedIn: CheckedInputStream = null
+try {
+  dataIn = new FileInputStream(data)
+  indexIn = new DataInputStream(new FileInputStream(index))
+  var prevOffset = indexIn.readLong
+  (0 until numPartition).foreach { i =>
+val curOffset = indexIn.readLong
+val limit = (curOffset - prevOffset).toInt
+val bytes = new Array[Byte](limit)
+checkedIn = new CheckedInputStream(
+  new LimitedInputStream(dataIn, curOffset - prevOffset), new Adler32)

Review comment:
   shall we pick the algorithm from the conf?

##
File path: 
core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTester.scala
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.{DataInputStream, File, FileInputStream}
+import java.util.zip.{Adler32, CheckedInputStream}
+
+import org.apache.spark.network.util.LimitedInputStream
+
+trait ShuffleChecksumTester {
+
+  /**
+   * Ensure that the checksum values are consistent between write and read 
side.
+   */
+  def compareChecksums(numPartition: Int, checksum: File, data: File, index: 
File): Unit = {
+assert(checksum.exists(), "Checksum file doesn't exist")
+assert(data.exists(), "Data file doesn't exist")
+assert(index.exists(), "Index file doesn't exist")
+
+var checksumIn: DataInputStream = null
+val expectChecksums = Array.ofDim[Long](numPartition)
+try {
+  checksumIn = new DataInputStream(new FileInputStream(checksum))
+  (0 until numPartition).foreach(i => expectChecksums(i) = 
checksumIn.readLong())
+} finally {
+  if (checksumIn != null) {
+checksumIn.close()
+  }
+}
+
+var dataIn: FileInputStream = null
+var indexIn: DataInputStream = null
+var checkedIn: CheckedInputStream = null
+try {
+  dataIn = new FileInputStream(data)
+  indexIn = new DataInputStream(new FileInputStream(index))
+  var prevOffset = indexIn.readLong
+  (0 until numPartition).foreach { i =>
+val curOffset = indexIn.readLong

[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-11 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r66767



##
File path: 
core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTester.scala
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle
+
+import java.io.{DataInputStream, File, FileInputStream}
+import java.util.zip.{Adler32, CheckedInputStream}
+
+import org.apache.spark.network.util.LimitedInputStream
+
+trait ShuffleChecksumTester {

Review comment:
   `ShuffleChecksumTestHelper`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-11 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r667669668



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1368,6 +1368,25 @@ package object config {
 s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
   .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM_ENABLED =
+ConfigBuilder("spark.shuffle.checksum.enabled")
+  .doc("Whether to calculate the checksum of shuffle output. If enabled, 
Spark will try " +
+"its best to tell if shuffle data corruption is caused by network or 
disk or others.")
+  .version("3.3.0")
+  .booleanConf
+  .createWithDefault(true)
+
+  private[spark] val SHUFFLE_CHECKSUM_ALGORITHM =
+ConfigBuilder("spark.shuffle.checksum.algorithm")
+  .doc("The algorithm used to calculate the checksum. Currently, it only 
supports" +
+" built-in algorithms of JDK.")
+  .version("3.3.0")
+  .stringConf
+  .transform(_.toLowerCase(Locale.ROOT))
+  .checkValue(Set("adler32", "crc32").contains, "Shuffle checksum 
algorithm " +
+"should be either Adler32 or CRC32.")
+  .createWithDefault("Adler32")

Review comment:
   let's normalize the conf value with upper case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-11 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r667668816



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -107,14 +110,19 @@
   @Nullable private MemoryBlock currentPage = null;
   private long pageCursor = -1;
 
+  // Checksum calculator for each partition. Empty when shuffle checksum 
disabled.
+  private final Checksum[] partitionChecksums;
+
   ShuffleExternalSorter(
   TaskMemoryManager memoryManager,
   BlockManager blockManager,
   TaskContext taskContext,
   int initialSize,
+  int shuffleId,
+  long mapId,

Review comment:
   these two are not used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664290337



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -133,6 +145,23 @@
 this.peakMemoryUsedBytes = getMemoryUsage();
 this.diskWriteBufferSize =
 (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+this.checksumEnabled = 
ShuffleChecksumHelper.isShuffleChecksumEnabled(conf);
+if (this.checksumEnabled) {
+  this.partitionChecksums = 
ShuffleChecksumHelper.createPartitionChecksums(numPartitions, conf);
+}
+  }
+
+  public long[] getChecksums() {

Review comment:
   we can move this to the helper.

##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -107,20 +112,27 @@
   @Nullable private MemoryBlock currentPage = null;
   private long pageCursor = -1;
 
+  private final boolean checksumEnabled;
+  private Checksum[] partitionChecksums;
+
   ShuffleExternalSorter(
   TaskMemoryManager memoryManager,
   BlockManager blockManager,
   TaskContext taskContext,
   int initialSize,
+  int shuffleId,
+  long mapId,
   int numPartitions,
   SparkConf conf,
-  ShuffleWriteMetricsReporter writeMetrics) {
+  ShuffleWriteMetricsReporter writeMetrics) throws SparkException {
 super(memoryManager,
   (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, 
memoryManager.pageSizeBytes()),
   memoryManager.getTungstenMemoryMode());
 this.taskMemoryManager = memoryManager;
 this.blockManager = blockManager;
 this.taskContext = taskContext;
+this.shuffleId = shuffleId;
+this.mapId = mapId;

Review comment:
   these 2 are not used.

##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1368,6 +1368,24 @@ package object config {
 s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
   .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM_ENABLED =
+ConfigBuilder("spark.shuffle.checksum.enabled")
+  .doc("Whether to calculate the checksum of shuffle output. If enabled, 
Spark will try " +
+"its best to tell if shuffle data corruption is caused by network or 
disk or others.")
+  .version("3.3.0")
+  .booleanConf
+  .createWithDefault(true)
+
+  private[spark] val SHUFFLE_CHECKSUM_ALGORITHM =
+ConfigBuilder("spark.shuffle.checksum.algorithm")
+  .doc("The algorithm used to calculate the checksum. Currently, it only 
supports" +
+" built-in algorithms of JDK.")
+  .version("3.3.0")
+  .stringConf
+  .checkValue(Set("Adler32", "CRC32").contains, "Shuffle checksum 
algorithm " +

Review comment:
   this should be case insensitive.

##
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
##
@@ -45,6 +47,23 @@ private[spark] class ShufflePartitionPairsWriter(
   private var numRecordsWritten = 0
   private var curNumBytesWritten = 0L
 
+  // checksum related
+  private var checksumEnabled = false
+  private var checksumOutputStream: MutableCheckedOutputStream = _
+  private var checksum: Checksum = _
+
+  /**
+   * Set the checksum that the checksumOutputStream should use
+   */
+  def setChecksum(checksum: Checksum): Unit = {
+if (checksumOutputStream == null) {
+  this.checksumEnabled = true
+  this.checksum = checksum

Review comment:
   We can set this in constructor

##
File path: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
##
@@ -763,6 +775,7 @@ private[spark] class ExternalSorter[K, V, C](
 serInstance,
 blockId,
 context.taskMetrics().shuffleWriteMetrics)
+  partitionPairsWriter.setChecksum(partitionChecksums(partitionId))

Review comment:
   we need to check if checksum is enabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664307797



##
File path: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
##
@@ -763,6 +775,7 @@ private[spark] class ExternalSorter[K, V, C](
 serInstance,
 blockId,
 context.taskMetrics().shuffleWriteMetrics)
+  partitionPairsWriter.setChecksum(partitionChecksums(partitionId))

Review comment:
   we need to check if checksum is enabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664306639



##
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
##
@@ -45,6 +47,23 @@ private[spark] class ShufflePartitionPairsWriter(
   private var numRecordsWritten = 0
   private var curNumBytesWritten = 0L
 
+  // checksum related
+  private var checksumEnabled = false
+  private var checksumOutputStream: MutableCheckedOutputStream = _
+  private var checksum: Checksum = _
+
+  /**
+   * Set the checksum that the checksumOutputStream should use
+   */
+  def setChecksum(checksum: Checksum): Unit = {
+if (checksumOutputStream == null) {
+  this.checksumEnabled = true
+  this.checksum = checksum

Review comment:
   We can set this in constructor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664293983



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1368,6 +1368,24 @@ package object config {
 s"The buffer size must be greater than 0 and less than or equal to 
${Int.MaxValue}.")
   .createWithDefault(4096)
 
+  private[spark] val SHUFFLE_CHECKSUM_ENABLED =
+ConfigBuilder("spark.shuffle.checksum.enabled")
+  .doc("Whether to calculate the checksum of shuffle output. If enabled, 
Spark will try " +
+"its best to tell if shuffle data corruption is caused by network or 
disk or others.")
+  .version("3.3.0")
+  .booleanConf
+  .createWithDefault(true)
+
+  private[spark] val SHUFFLE_CHECKSUM_ALGORITHM =
+ConfigBuilder("spark.shuffle.checksum.algorithm")
+  .doc("The algorithm used to calculate the checksum. Currently, it only 
supports" +
+" built-in algorithms of JDK.")
+  .version("3.3.0")
+  .stringConf
+  .checkValue(Set("Adler32", "CRC32").contains, "Shuffle checksum 
algorithm " +

Review comment:
   this should be case insensitive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664290804



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -107,20 +112,27 @@
   @Nullable private MemoryBlock currentPage = null;
   private long pageCursor = -1;
 
+  private final boolean checksumEnabled;
+  private Checksum[] partitionChecksums;
+
   ShuffleExternalSorter(
   TaskMemoryManager memoryManager,
   BlockManager blockManager,
   TaskContext taskContext,
   int initialSize,
+  int shuffleId,
+  long mapId,
   int numPartitions,
   SparkConf conf,
-  ShuffleWriteMetricsReporter writeMetrics) {
+  ShuffleWriteMetricsReporter writeMetrics) throws SparkException {
 super(memoryManager,
   (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, 
memoryManager.pageSizeBytes()),
   memoryManager.getTungstenMemoryMode());
 this.taskMemoryManager = memoryManager;
 this.blockManager = blockManager;
 this.taskContext = taskContext;
+this.shuffleId = shuffleId;
+this.mapId = mapId;

Review comment:
   these 2 are not used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-07-06 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r664290337



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
##
@@ -133,6 +145,23 @@
 this.peakMemoryUsedBytes = getMemoryUsage();
 this.diskWriteBufferSize =
 (int) (long) 
conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
+this.checksumEnabled = 
ShuffleChecksumHelper.isShuffleChecksumEnabled(conf);
+if (this.checksumEnabled) {
+  this.partitionChecksums = 
ShuffleChecksumHelper.createPartitionChecksums(numPartitions, conf);
+}
+  }
+
+  public long[] getChecksums() {

Review comment:
   we can move this to the helper.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-06-15 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r652345736



##
File path: 
core/src/main/java/org/apache/spark/shuffle/api/ShuffleMapOutputWriter.java
##
@@ -68,8 +72,11 @@
*for that partition id.
* 
* 2) An optional metadata blob that can be used by shuffle readers.
+   *
+   * @param checksums The checksum values for each partition if shuffle 
checksum enabled.
+   *  Otherwise, it's empty.
*/
-  MapOutputCommitMessage commitAllPartitions() throws IOException;
+  MapOutputCommitMessage commitAllPartitions(long[] checksums) throws 
IOException;

Review comment:
   TBH I don't think the current shuffle API provides enough abstraction to 
do checksum. I'm OK with this change as the shuffle API is still private, but 
we should revisit the shuffle API later, so that checksum can be done at the 
shuffle implementation side.
   
   The current issue I see is, Spark writes local spill files and then asks the 
shuffle implementation to "transfer" the spill files. Then Spark has to do 
checksum by itself during spill file writing, to reduce the perf overhead.
   
   We can discuss it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-06-09 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r648092216



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##
@@ -164,6 +180,13 @@ public void write(Iterator> records) throws 
IOException {
   }
 
   partitionLengths = writePartitionedData(mapOutputWriter);
+  if (checksumEnabled) {
+long[] checksums = new long[numPartitions];
+for (int i = 0; i < numPartitions; i ++) {
+  checksums[i] = partitionChecksums[i].getValue();
+}
+IndexShuffleBlockResolver.get().writeChecksumFile(shuffleId, mapId, 
checksums);

Review comment:
   Oh actually we did, but it's done by 
`ShuffleMapOutputWriter.commitAllPartitions`. Does this checksum file work with 
custom shuffle extensions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #32401: [SPARK-35276][CORE] Calculate checksum for shuffle data and write as checksum file

2021-06-09 Thread GitBox


cloud-fan commented on a change in pull request #32401:
URL: https://github.com/apache/spark/pull/32401#discussion_r648090701



##
File path: 
core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##
@@ -164,6 +180,13 @@ public void write(Iterator> records) throws 
IOException {
   }
 
   partitionLengths = writePartitionedData(mapOutputWriter);
+  if (checksumEnabled) {
+long[] checksums = new long[numPartitions];
+for (int i = 0; i < numPartitions; i ++) {
+  checksums[i] = partitionChecksums[i].getValue();
+}
+IndexShuffleBlockResolver.get().writeChecksumFile(shuffleId, mapId, 
checksums);

Review comment:
   How do we write the shuffle index file right now? In this method, we 
also calculate the partition lengths but we don't write the index file 
immediately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org