Repository: spark
Updated Branches:
  refs/heads/branch-1.6 46fc7a12a -> 3221a7d91


[SPARK-12582][TEST] IndexShuffleBlockResolverSuite fails in windows

[SPARK-12582][Test] IndexShuffleBlockResolverSuite fails in windows

* IndexShuffleBlockResolverSuite fails in windows due to file is not closed.
* mv IndexShuffleBlockResolverSuite.scala from "test/java" to "test/scala".

https://issues.apache.org/jira/browse/SPARK-12582

Author: Yucai Yu <yucai...@intel.com>

Closes #10526 from yucai/master.

(cherry picked from commit 7e15044d9d9f9839c8d422bae71f27e855d559b4)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3221a7d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3221a7d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3221a7d9

Branch: refs/heads/branch-1.6
Commit: 3221a7d912bdc5a1ce5992501e1a2e6a8248c668
Parents: 46fc7a1
Author: Yucai Yu <yucai...@intel.com>
Authored: Tue Jan 12 13:23:23 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jan 12 13:23:34 2016 +0000

----------------------------------------------------------------------
 .../sort/IndexShuffleBlockResolverSuite.scala   | 114 ----------------
 .../sort/IndexShuffleBlockResolverSuite.scala   | 131 +++++++++++++++++++
 2 files changed, 131 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3221a7d9/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 
b/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
deleted file mode 100644
index 0b19861..0000000
--- 
a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.sort
-
-import java.io.{File, FileInputStream, FileOutputStream}
-
-import org.mockito.Answers.RETURNS_SMART_NULLS
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
-import org.mockito.{Mock, MockitoAnnotations}
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.shuffle.IndexShuffleBlockResolver
-import org.apache.spark.storage._
-import org.apache.spark.util.Utils
-import org.apache.spark.{SparkConf, SparkFunSuite}
-
-
-class IndexShuffleBlockResolverSuite extends SparkFunSuite with 
BeforeAndAfterEach {
-
-  @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
-  @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: 
DiskBlockManager = _
-
-  private var tempDir: File = _
-  private val conf: SparkConf = new SparkConf(loadDefaults = false)
-
-  override def beforeEach(): Unit = {
-    tempDir = Utils.createTempDir()
-    MockitoAnnotations.initMocks(this)
-
-    when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
-    when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
-      new Answer[File] {
-        override def answer(invocation: InvocationOnMock): File = {
-          new File(tempDir, invocation.getArguments.head.toString)
-        }
-      })
-  }
-
-  override def afterEach(): Unit = {
-    Utils.deleteRecursively(tempDir)
-  }
-
-  test("commit shuffle files multiple times") {
-    val lengths = Array[Long](10, 0, 20)
-    val resolver = new IndexShuffleBlockResolver(conf, blockManager)
-    val dataTmp = File.createTempFile("shuffle", null, tempDir)
-    val out = new FileOutputStream(dataTmp)
-    out.write(new Array[Byte](30))
-    out.close()
-    resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
-
-    val dataFile = resolver.getDataFile(1, 2)
-    assert(dataFile.exists())
-    assert(dataFile.length() === 30)
-    assert(!dataTmp.exists())
-
-    val dataTmp2 = File.createTempFile("shuffle", null, tempDir)
-    val out2 = new FileOutputStream(dataTmp2)
-    val lengths2 = new Array[Long](3)
-    out2.write(Array[Byte](1))
-    out2.write(new Array[Byte](29))
-    out2.close()
-    resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
-    assert(lengths2.toSeq === lengths.toSeq)
-    assert(dataFile.exists())
-    assert(dataFile.length() === 30)
-    assert(!dataTmp2.exists())
-
-    // The dataFile should be the previous one
-    val in = new FileInputStream(dataFile)
-    val firstByte = new Array[Byte](1)
-    in.read(firstByte)
-    assert(firstByte(0) === 0)
-
-    // remove data file
-    dataFile.delete()
-
-    val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
-    val out3 = new FileOutputStream(dataTmp3)
-    val lengths3 = Array[Long](10, 10, 15)
-    out3.write(Array[Byte](2))
-    out3.write(new Array[Byte](34))
-    out3.close()
-    resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
-    assert(lengths3.toSeq != lengths.toSeq)
-    assert(dataFile.exists())
-    assert(dataFile.length() === 35)
-    assert(!dataTmp2.exists())
-
-    // The dataFile should be the previous one
-    val in2 = new FileInputStream(dataFile)
-    val firstByte2 = new Array[Byte](1)
-    in2.read(firstByte2)
-    assert(firstByte2(0) === 2)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3221a7d9/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
new file mode 100644
index 0000000..591edbe
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.io.{File, FileInputStream, FileOutputStream}
+
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+
+class IndexShuffleBlockResolverSuite extends SparkFunSuite with 
BeforeAndAfterEach {
+
+  @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = 
_
+  @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: 
DiskBlockManager = _
+
+  private var tempDir: File = _
+  private val conf: SparkConf = new SparkConf(loadDefaults = false)
+
+  override def beforeEach(): Unit = {
+    tempDir = Utils.createTempDir()
+    MockitoAnnotations.initMocks(this)
+
+    when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
+    when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
+      new Answer[File] {
+        override def answer(invocation: InvocationOnMock): File = {
+          new File(tempDir, invocation.getArguments.head.toString)
+        }
+      })
+  }
+
+  override def afterEach(): Unit = {
+    Utils.deleteRecursively(tempDir)
+  }
+
+  test("commit shuffle files multiple times") {
+    val resolver = new IndexShuffleBlockResolver(conf, blockManager)
+    val lengths = Array[Long](10, 0, 20)
+    val dataTmp = File.createTempFile("shuffle", null, tempDir)
+    val out = new FileOutputStream(dataTmp)
+    Utils.tryWithSafeFinally {
+      out.write(new Array[Byte](30))
+    } {
+      out.close()
+    }
+    resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp)
+
+    val dataFile = resolver.getDataFile(1, 2)
+    assert(dataFile.exists())
+    assert(dataFile.length() === 30)
+    assert(!dataTmp.exists())
+
+    val lengths2 = new Array[Long](3)
+    val dataTmp2 = File.createTempFile("shuffle", null, tempDir)
+    val out2 = new FileOutputStream(dataTmp2)
+    Utils.tryWithSafeFinally {
+      out2.write(Array[Byte](1))
+      out2.write(new Array[Byte](29))
+    } {
+      out2.close()
+    }
+    resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2)
+    assert(lengths2.toSeq === lengths.toSeq)
+    assert(dataFile.exists())
+    assert(dataFile.length() === 30)
+    assert(!dataTmp2.exists())
+
+    // The dataFile should be the previous one
+    val firstByte = new Array[Byte](1)
+    val in = new FileInputStream(dataFile)
+    Utils.tryWithSafeFinally {
+      in.read(firstByte)
+    } {
+      in.close()
+    }
+    assert(firstByte(0) === 0)
+
+    // remove data file
+    dataFile.delete()
+
+    val lengths3 = Array[Long](10, 10, 15)
+    val dataTmp3 = File.createTempFile("shuffle", null, tempDir)
+    val out3 = new FileOutputStream(dataTmp3)
+    Utils.tryWithSafeFinally {
+      out3.write(Array[Byte](2))
+      out3.write(new Array[Byte](34))
+    } {
+      out3.close()
+    }
+    resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3)
+    assert(lengths3.toSeq != lengths.toSeq)
+    assert(dataFile.exists())
+    assert(dataFile.length() === 35)
+    assert(!dataTmp2.exists())
+
+    // The dataFile should be the previous one
+    val firstByte2 = new Array[Byte](1)
+    val in2 = new FileInputStream(dataFile)
+    Utils.tryWithSafeFinally {
+      in2.read(firstByte2)
+    } {
+      in2.close()
+    }
+    assert(firstByte2(0) === 2)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to