This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 67751e2  [SPARK-29072][CORE] Put back usage of 
TimeTrackingOutputStream for UnsafeShuffleWriter and SortShuffleWriter
67751e2 is described below

commit 67751e26940a16ab6f9950ae66a46b7cb901c102
Author: mcheah <mch...@palantir.com>
AuthorDate: Mon Sep 16 09:08:25 2019 -0500

    [SPARK-29072][CORE] Put back usage of TimeTrackingOutputStream for 
UnsafeShuffleWriter and SortShuffleWriter
    
    ### What changes were proposed in this pull request?
    The previous refactors of the shuffle writers using the shuffle writer 
plugin resulted in shuffle write metric updates - particularly write times - 
being lost in particular situations. This patch restores the lost metric 
updates.
    
    ### Why are the changes needed?
    This fixes a regression. I'm pretty sure that without this, the Spark UI 
will lose shuffle write time information.
    
    ### Does this PR introduce any user-facing change?
    No change from Spark 2.4. Without this, there would be a user-facing bug in 
Spark 3.0.
    
    ### How was this patch tested?
    Existing unit tests.
    
    Closes #25780 from mccheah/fix-write-metrics.
    
    Authored-by: mcheah <mch...@palantir.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../apache/spark/shuffle/sort/UnsafeShuffleWriter.java    |  2 ++
 .../spark/shuffle/ShufflePartitionPairsWriter.scala       | 15 ++++++++++++---
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index f59bddc..4d11abd 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -57,6 +57,7 @@ import org.apache.spark.shuffle.api.ShufflePartitionWriter;
 import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
 import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
 import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.TimeTrackingOutputStream;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.util.Utils;
 
@@ -382,6 +383,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
         ShufflePartitionWriter writer = 
mapWriter.getPartitionWriter(partition);
         OutputStream partitionOutput = writer.openStream();
         try {
+          partitionOutput = new TimeTrackingOutputStream(writeMetrics, 
partitionOutput);
           partitionOutput = 
blockManager.serializerManager().wrapForEncryption(partitionOutput);
           if (compressionCodec != null) {
             partitionOutput = 
compressionCodec.compressedOutputStream(partitionOutput);
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
index a988c5e..e0affb8 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala
@@ -21,7 +21,7 @@ import java.io.{Closeable, IOException, OutputStream}
 
 import org.apache.spark.serializer.{SerializationStream, SerializerInstance, 
SerializerManager}
 import org.apache.spark.shuffle.api.ShufflePartitionWriter
-import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.{BlockId, TimeTrackingOutputStream}
 import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.PairsWriter
 
@@ -39,6 +39,7 @@ private[spark] class ShufflePartitionPairsWriter(
 
   private var isClosed = false
   private var partitionStream: OutputStream = _
+  private var timeTrackingStream: OutputStream = _
   private var wrappedStream: OutputStream = _
   private var objOut: SerializationStream = _
   private var numRecordsWritten = 0
@@ -59,7 +60,8 @@ private[spark] class ShufflePartitionPairsWriter(
   private def open(): Unit = {
     try {
       partitionStream = partitionWriter.openStream
-      wrappedStream = serializerManager.wrapStream(blockId, partitionStream)
+      timeTrackingStream = new TimeTrackingOutputStream(writeMetrics, 
partitionStream)
+      wrappedStream = serializerManager.wrapStream(blockId, timeTrackingStream)
       objOut = serializerInstance.serializeStream(wrappedStream)
     } catch {
       case e: Exception =>
@@ -78,6 +80,7 @@ private[spark] class ShufflePartitionPairsWriter(
           // Setting these to null will prevent the underlying streams from 
being closed twice
           // just in case any stream's close() implementation is not 
idempotent.
           wrappedStream = null
+          timeTrackingStream = null
           partitionStream = null
         } {
           // Normally closing objOut would close the inner streams as well, 
but just in case there
@@ -86,9 +89,15 @@ private[spark] class ShufflePartitionPairsWriter(
             wrappedStream = closeIfNonNull(wrappedStream)
             // Same as above - if wrappedStream closes then assume it closes 
underlying
             // partitionStream and don't close again in the finally
+            timeTrackingStream = null
             partitionStream = null
           } {
-            partitionStream = closeIfNonNull(partitionStream)
+            Utils.tryWithSafeFinally {
+              timeTrackingStream = closeIfNonNull(timeTrackingStream)
+              partitionStream = null
+            } {
+              partitionStream = closeIfNonNull(partitionStream)
+            }
           }
         }
         updateBytesWritten()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to