Repository: spark
Updated Branches:
  refs/heads/branch-2.1 4f6fccf15 -> 6e6adccea


[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position after 
FileChannel.transferTo

## What changes were proposed in this pull request?

Long time ago we fixed a 
[bug](https://issues.apache.org/jira/browse/SPARK-3948) in shuffle writer about 
`FileChannel.transferTo`. We were not very confident about that fix, so we 
added a position check after the writing, try to discover the bug earlier.

 However this checking is missing in the new `UnsafeShuffleWriter`, this PR 
adds it.

https://issues.apache.org/jira/browse/SPARK-18105 maybe related to that 
`FileChannel.transferTo` bug, hopefully we can find out the root cause after 
adding this position check.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #18091 from cloud-fan/shuffle.

(cherry picked from commit d9ad78908f6189719cec69d34557f1a750d2e6af)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e6adcce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e6adcce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e6adcce

Branch: refs/heads/branch-2.1
Commit: 6e6adcceab9218463d8c7637350d7f5229cf648d
Parents: 4f6fccf
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri May 26 15:01:28 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Fri May 26 15:02:04 2017 +0800

----------------------------------------------------------------------
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 15 ++---
 .../scala/org/apache/spark/util/Utils.scala     | 71 +++++++++++---------
 2 files changed, 47 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6e6adcce/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 8a17718..2fde5c3 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -422,17 +422,14 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       for (int partition = 0; partition < numPartitions; partition++) {
         for (int i = 0; i < spills.length; i++) {
           final long partitionLengthInSpill = 
spills[i].partitionLengths[partition];
-          long bytesToTransfer = partitionLengthInSpill;
           final FileChannel spillInputChannel = spillInputChannels[i];
           final long writeStartTime = System.nanoTime();
-          while (bytesToTransfer > 0) {
-            final long actualBytesTransferred = spillInputChannel.transferTo(
-              spillInputChannelPositions[i],
-              bytesToTransfer,
-              mergedFileOutputChannel);
-            spillInputChannelPositions[i] += actualBytesTransferred;
-            bytesToTransfer -= actualBytesTransferred;
-          }
+          Utils.copyFileStreamNIO(
+            spillInputChannel,
+            mergedFileOutputChannel,
+            spillInputChannelPositions[i],
+            partitionLengthInSpill);
+          spillInputChannelPositions[i] += partitionLengthInSpill;
           writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
           bytesWrittenToMergedFile += partitionLengthInSpill;
           partitionLengths[partition] += partitionLengthInSpill;

http://git-wip-us.apache.org/repos/asf/spark/blob/6e6adcce/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4cdfb9c..56de802 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, 
ThreadInfo}
 import java.net._
 import java.nio.ByteBuffer
-import java.nio.channels.Channels
+import java.nio.channels.{Channels, FileChannel}
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 import java.util.{Locale, Properties, Random, UUID}
@@ -58,7 +58,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
-import org.apache.spark.util.logging.RollingFileAppender
 
 /** CallSite represents a place in user code. It can have a short and a long 
form. */
 private[spark] case class CallSite(shortForm: String, longForm: String)
@@ -313,41 +312,22 @@ private[spark] object Utils extends Logging {
    * copying is disabled by default unless explicitly set transferToEnabled as 
true,
    * the parameter transferToEnabled should be configured by 
spark.file.transferTo = [true|false].
    */
-  def copyStream(in: InputStream,
-                 out: OutputStream,
-                 closeStreams: Boolean = false,
-                 transferToEnabled: Boolean = false): Long =
-  {
-    var count = 0L
+  def copyStream(
+      in: InputStream,
+      out: OutputStream,
+      closeStreams: Boolean = false,
+      transferToEnabled: Boolean = false): Long = {
     tryWithSafeFinally {
       if (in.isInstanceOf[FileInputStream] && 
out.isInstanceOf[FileOutputStream]
         && transferToEnabled) {
         // When both streams are File stream, use transferTo to improve copy 
performance.
         val inChannel = in.asInstanceOf[FileInputStream].getChannel()
         val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
-        val initialPos = outChannel.position()
         val size = inChannel.size()
-
-        // In case transferTo method transferred less data than we have 
required.
-        while (count < size) {
-          count += inChannel.transferTo(count, size - count, outChannel)
-        }
-
-        // Check the position after transferTo loop to see if it is in the 
right position and
-        // give user information if not.
-        // Position will not be increased to the expected length after calling 
transferTo in
-        // kernel version 2.6.32, this issue can be seen in
-        // https://bugs.openjdk.java.net/browse/JDK-7052359
-        // This will lead to stream corruption issue when using sort-based 
shuffle (SPARK-3948).
-        val finalPos = outChannel.position()
-        assert(finalPos == initialPos + size,
-          s"""
-             |Current position $finalPos do not equal to expected position 
${initialPos + size}
-             |after transferTo, please check your kernel version to see if it 
is 2.6.32,
-             |this is a kernel bug which will lead to unexpected behavior when 
using transferTo.
-             |You can set spark.file.transferTo = false to disable this NIO 
feature.
-           """.stripMargin)
+        copyFileStreamNIO(inChannel, outChannel, 0, size)
+        size
       } else {
+        var count = 0L
         val buf = new Array[Byte](8192)
         var n = 0
         while (n != -1) {
@@ -357,8 +337,8 @@ private[spark] object Utils extends Logging {
             count += n
           }
         }
+        count
       }
-      count
     } {
       if (closeStreams) {
         try {
@@ -370,6 +350,37 @@ private[spark] object Utils extends Logging {
     }
   }
 
+  def copyFileStreamNIO(
+      input: FileChannel,
+      output: FileChannel,
+      startPosition: Long,
+      bytesToCopy: Long): Unit = {
+    val initialPos = output.position()
+    var count = 0L
+    // In case transferTo method transferred less data than we have required.
+    while (count < bytesToCopy) {
+      count += input.transferTo(count + startPosition, bytesToCopy - count, 
output)
+    }
+    assert(count == bytesToCopy,
+      s"request to copy $bytesToCopy bytes, but actually copied $count bytes.")
+
+    // Check the position after transferTo loop to see if it is in the right 
position and
+    // give user information if not.
+    // Position will not be increased to the expected length after calling 
transferTo in
+    // kernel version 2.6.32, this issue can be seen in
+    // https://bugs.openjdk.java.net/browse/JDK-7052359
+    // This will lead to stream corruption issue when using sort-based shuffle 
(SPARK-3948).
+    val finalPos = output.position()
+    val expectedPos = initialPos + bytesToCopy
+    assert(finalPos == expectedPos,
+      s"""
+         |Current position $finalPos do not equal to expected position 
$expectedPos
+         |after transferTo, please check your kernel version to see if it is 
2.6.32,
+         |this is a kernel bug which will lead to unexpected behavior when 
using transferTo.
+         |You can set spark.file.transferTo = false to disable this NIO 
feature.
+           """.stripMargin)
+  }
+
   /**
    * Construct a URI container information used for authentication.
    * This also sets the default authenticator to properly negotiation the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to