Repository: spark Updated Branches: refs/heads/branch-2.1 4f6fccf15 -> 6e6adccea
[SPARK-20868][CORE] UnsafeShuffleWriter should verify the position after FileChannel.transferTo ## What changes were proposed in this pull request? Long time ago we fixed a [bug](https://issues.apache.org/jira/browse/SPARK-3948) in shuffle writer about `FileChannel.transferTo`. We were not very confident about that fix, so we added a position check after the writing, try to discover the bug earlier. However this checking is missing in the new `UnsafeShuffleWriter`, this PR adds it. https://issues.apache.org/jira/browse/SPARK-18105 maybe related to that `FileChannel.transferTo` bug, hopefully we can find out the root cause after adding this position check. ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #18091 from cloud-fan/shuffle. (cherry picked from commit d9ad78908f6189719cec69d34557f1a750d2e6af) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e6adcce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e6adcce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e6adcce Branch: refs/heads/branch-2.1 Commit: 6e6adcceab9218463d8c7637350d7f5229cf648d Parents: 4f6fccf Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri May 26 15:01:28 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri May 26 15:02:04 2017 +0800 ---------------------------------------------------------------------- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 15 ++--- .../scala/org/apache/spark/util/Utils.scala | 71 +++++++++++--------- 2 files changed, 47 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6e6adcce/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 8a17718..2fde5c3 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -422,17 +422,14 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { for (int partition = 0; partition < numPartitions; partition++) { for (int i = 0; i < spills.length; i++) { final long partitionLengthInSpill = spills[i].partitionLengths[partition]; - long bytesToTransfer = partitionLengthInSpill; final FileChannel spillInputChannel = spillInputChannels[i]; final long writeStartTime = System.nanoTime(); - while (bytesToTransfer > 0) { - final long actualBytesTransferred = spillInputChannel.transferTo( - spillInputChannelPositions[i], - bytesToTransfer, - mergedFileOutputChannel); - spillInputChannelPositions[i] += actualBytesTransferred; - bytesToTransfer -= actualBytesTransferred; - } + Utils.copyFileStreamNIO( + spillInputChannel, + mergedFileOutputChannel, + spillInputChannelPositions[i], + partitionLengthInSpill); + spillInputChannelPositions[i] += partitionLengthInSpill; writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); bytesWrittenToMergedFile += partitionLengthInSpill; partitionLengths[partition] += partitionLengthInSpill; http://git-wip-us.apache.org/repos/asf/spark/blob/6e6adcce/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4cdfb9c..56de802 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,7 +21,7 @@ import java.io._ import java.lang.management.{LockInfo, ManagementFactory, MonitorInfo, ThreadInfo} import java.net._ import java.nio.ByteBuffer -import java.nio.channels.Channels +import java.nio.channels.{Channels, FileChannel} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} import java.util.{Locale, Properties, Random, UUID} @@ -58,7 +58,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES} import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} -import org.apache.spark.util.logging.RollingFileAppender /** CallSite represents a place in user code. It can have a short and a long form. */ private[spark] case class CallSite(shortForm: String, longForm: String) @@ -313,41 +312,22 @@ private[spark] object Utils extends Logging { * copying is disabled by default unless explicitly set transferToEnabled as true, * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false]. */ - def copyStream(in: InputStream, - out: OutputStream, - closeStreams: Boolean = false, - transferToEnabled: Boolean = false): Long = - { - var count = 0L + def copyStream( + in: InputStream, + out: OutputStream, + closeStreams: Boolean = false, + transferToEnabled: Boolean = false): Long = { tryWithSafeFinally { if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream] && transferToEnabled) { // When both streams are File stream, use transferTo to improve copy performance. val inChannel = in.asInstanceOf[FileInputStream].getChannel() val outChannel = out.asInstanceOf[FileOutputStream].getChannel() - val initialPos = outChannel.position() val size = inChannel.size() - - // In case transferTo method transferred less data than we have required. - while (count < size) { - count += inChannel.transferTo(count, size - count, outChannel) - } - - // Check the position after transferTo loop to see if it is in the right position and - // give user information if not. - // Position will not be increased to the expected length after calling transferTo in - // kernel version 2.6.32, this issue can be seen in - // https://bugs.openjdk.java.net/browse/JDK-7052359 - // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). - val finalPos = outChannel.position() - assert(finalPos == initialPos + size, - s""" - |Current position $finalPos do not equal to expected position ${initialPos + size} - |after transferTo, please check your kernel version to see if it is 2.6.32, - |this is a kernel bug which will lead to unexpected behavior when using transferTo. - |You can set spark.file.transferTo = false to disable this NIO feature. - """.stripMargin) + copyFileStreamNIO(inChannel, outChannel, 0, size) + size } else { + var count = 0L val buf = new Array[Byte](8192) var n = 0 while (n != -1) { @@ -357,8 +337,8 @@ private[spark] object Utils extends Logging { count += n } } + count } - count } { if (closeStreams) { try { @@ -370,6 +350,37 @@ private[spark] object Utils extends Logging { } } + def copyFileStreamNIO( + input: FileChannel, + output: FileChannel, + startPosition: Long, + bytesToCopy: Long): Unit = { + val initialPos = output.position() + var count = 0L + // In case transferTo method transferred less data than we have required. + while (count < bytesToCopy) { + count += input.transferTo(count + startPosition, bytesToCopy - count, output) + } + assert(count == bytesToCopy, + s"request to copy $bytesToCopy bytes, but actually copied $count bytes.") + + // Check the position after transferTo loop to see if it is in the right position and + // give user information if not. + // Position will not be increased to the expected length after calling transferTo in + // kernel version 2.6.32, this issue can be seen in + // https://bugs.openjdk.java.net/browse/JDK-7052359 + // This will lead to stream corruption issue when using sort-based shuffle (SPARK-3948). + val finalPos = output.position() + val expectedPos = initialPos + bytesToCopy + assert(finalPos == expectedPos, + s""" + |Current position $finalPos do not equal to expected position $expectedPos + |after transferTo, please check your kernel version to see if it is 2.6.32, + |this is a kernel bug which will lead to unexpected behavior when using transferTo. + |You can set spark.file.transferTo = false to disable this NIO feature. + """.stripMargin) + } + /** * Construct a URI container information used for authentication. * This also sets the default authenticator to properly negotiation the --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org