Repository: spark
Updated Branches:
  refs/heads/master b9503fcbb -> dd52681bf


[SPARK-23253][CORE][SHUFFLE] Only write shuffle temporary index file when there 
is not an existing one

## What changes were proposed in this pull request?

Shuffle Index temporay file is used for atomic creating shuffle index file, it 
is not needed when the index file already exists after another attempts of same 
task had it done.

## How was this patch tested?

exitsting ut

cc squito

Author: Kent Yao <yaooq...@hotmail.com>

Closes #20422 from yaooqinn/SPARK-23253.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd52681b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd52681b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd52681b

Branch: refs/heads/master
Commit: dd52681bf542386711609cb037a55b3d264eddef
Parents: b9503fc
Author: Kent Yao <yaooq...@hotmail.com>
Authored: Fri Feb 2 09:10:50 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Fri Feb 2 09:10:50 2018 -0600

----------------------------------------------------------------------
 .../shuffle/IndexShuffleBlockResolver.scala     | 27 +++++----
 .../sort/IndexShuffleBlockResolverSuite.scala   | 59 ++++++++++++++------
 2 files changed, 56 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd52681b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 266ee42..c5f3f6e 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -141,19 +141,6 @@ private[spark] class IndexShuffleBlockResolver(
     val indexFile = getIndexFile(shuffleId, mapId)
     val indexTmp = Utils.tempFileWith(indexFile)
     try {
-      val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
-      Utils.tryWithSafeFinally {
-        // We take in lengths of each block, need to convert it to offsets.
-        var offset = 0L
-        out.writeLong(offset)
-        for (length <- lengths) {
-          offset += length
-          out.writeLong(offset)
-        }
-      } {
-        out.close()
-      }
-
       val dataFile = getDataFile(shuffleId, mapId)
       // There is only one IndexShuffleBlockResolver per executor, this 
synchronization make sure
       // the following check and rename are atomic.
@@ -166,10 +153,22 @@ private[spark] class IndexShuffleBlockResolver(
           if (dataTmp != null && dataTmp.exists()) {
             dataTmp.delete()
           }
-          indexTmp.delete()
         } else {
           // This is the first successful attempt in writing the map outputs 
for this task,
           // so override any existing index and data files with the ones we 
wrote.
+          val out = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(indexTmp)))
+          Utils.tryWithSafeFinally {
+            // We take in lengths of each block, need to convert it to offsets.
+            var offset = 0L
+            out.writeLong(offset)
+            for (length <- lengths) {
+              offset += length
+              out.writeLong(offset)
+            }
+          } {
+            out.close()
+          }
+
           if (indexFile.exists()) {
             indexFile.delete()
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd52681b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index d21ce73..4ce379b 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.shuffle.sort
 
-import java.io.{File, FileInputStream, FileOutputStream}
+import java.io.{DataInputStream, File, FileInputStream, FileOutputStream}
 
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Answers.RETURNS_SMART_NULLS
@@ -64,6 +64,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
   }
 
   test("commit shuffle files multiple times") {
+    val shuffleId = 1
+    val mapId = 2
+    val idxName = s"shuffle_${shuffleId}_${mapId}_0.index"
     val resolver = new IndexShuffleBlockResolver(conf, blockManager)
     val lengths = Array[Long](10, 0, 20)
     val dataTmp = File.createTempFile("shuffle", null, tempDir)
@@ -73,9 +76,13 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     } {
       out.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
+    resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths, dataTmp)
 
-    val dataFile = resolver.getDataFile(1, 2)
+    val indexFile = new File(tempDir.getAbsolutePath, idxName)
+    val dataFile = resolver.getDataFile(shuffleId, mapId)
+
+    assert(indexFile.exists())
+    assert(indexFile.length() === (lengths.length + 1) * 8)
     assert(dataFile.exists())
     assert(dataFile.length() === 30)
     assert(!dataTmp.exists())
@@ -89,7 +96,9 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
     } {
       out2.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+    resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths2, dataTmp2)
+
+    assert(indexFile.length() === (lengths.length + 1) * 8)
     assert(lengths2.toSeq === lengths.toSeq)
     assert(dataFile.exists())
     assert(dataFile.length() === 30)
@@ -97,18 +106,27 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite 
with BeforeAndAfterEa
 
     // The dataFile should be the previous one
     val firstByte = new Array[Byte](1)
-    val in = new FileInputStream(dataFile)
+    val dataIn = new FileInputStream(dataFile)
     Utils.tryWithSafeFinally {
-      in.read(firstByte)
+      dataIn.read(firstByte)
     } {
-      in.close()
+      dataIn.close()
     }
     assert(firstByte(0) === 0)
 
+    // The index file should not change
+    val indexIn = new DataInputStream(new FileInputStream(indexFile))
+    Utils.tryWithSafeFinally {
+      indexIn.readLong() // the first offset is always 0
+      assert(indexIn.readLong() === 10, "The index file should not change")
+    } {
+      indexIn.close()
+    }
+
     // remove data file
     dataFile.delete()
 
-    val lengths3 = Array[Long](10, 10, 15)
+    val lengths3 = Array[Long](7, 10, 15, 3)
     val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
     val out3 = new FileOutputStream(dataTmp3)
     Utils.tryWithSafeFinally {
@@ -117,20 +135,29 @@ class IndexShuffleBlockResolverSuite extends 
SparkFunSuite with BeforeAndAfterEa
     } {
       out3.close()
     }
-    resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
+    resolver.writeIndexFileAndCommit(shuffleId, mapId, lengths3, dataTmp3)
+    assert(indexFile.length() === (lengths3.length + 1) * 8)
     assert(lengths3.toSeq != lengths.toSeq)
     assert(dataFile.exists())
     assert(dataFile.length() === 35)
-    assert(!dataTmp2.exists())
+    assert(!dataTmp3.exists())
 
-    // The dataFile should be the previous one
-    val firstByte2 = new Array[Byte](1)
-    val in2 = new FileInputStream(dataFile)
+    // The dataFile should be the new one, since we deleted the dataFile from 
the first attempt
+    val dataIn2 = new FileInputStream(dataFile)
+    Utils.tryWithSafeFinally {
+      dataIn2.read(firstByte)
+    } {
+      dataIn2.close()
+    }
+    assert(firstByte(0) === 2)
+
+    // The index file should be updated, since we deleted the dataFile from 
the first attempt
+    val indexIn2 = new DataInputStream(new FileInputStream(indexFile))
     Utils.tryWithSafeFinally {
-      in2.read(firstByte2)
+      indexIn2.readLong() // the first offset is always 0
+      assert(indexIn2.readLong() === 7, "The index file should be updated")
     } {
-      in2.close()
+      indexIn2.close()
     }
-    assert(firstByte2(0) === 2)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to