Repository: spark Updated Branches: refs/heads/master 9f0995bb0 -> 7e15044d9
[SPARK-12582][TEST] IndexShuffleBlockResolverSuite fails in windows [SPARK-12582][Test] IndexShuffleBlockResolverSuite fails in windows * IndexShuffleBlockResolverSuite fails in windows due to file is not closed. * mv IndexShuffleBlockResolverSuite.scala from "test/java" to "test/scala". https://issues.apache.org/jira/browse/SPARK-12582 Author: Yucai Yu <yucai...@intel.com> Closes #10526 from yucai/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e15044d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e15044d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e15044d Branch: refs/heads/master Commit: 7e15044d9d9f9839c8d422bae71f27e855d559b4 Parents: 9f0995b Author: Yucai Yu <yucai...@intel.com> Authored: Tue Jan 12 13:23:23 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Jan 12 13:23:23 2016 +0000 ---------------------------------------------------------------------- .../sort/IndexShuffleBlockResolverSuite.scala | 119 ---------------- .../sort/IndexShuffleBlockResolverSuite.scala | 136 +++++++++++++++++++ 2 files changed, 136 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7e15044d/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala deleted file mode 100644 index f200ff3..0000000 --- a/core/src/test/java/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle.sort - -import java.io.{File, FileInputStream, FileOutputStream} - -import org.mockito.Answers.RETURNS_SMART_NULLS -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer -import org.mockito.{Mock, MockitoAnnotations} -import org.scalatest.BeforeAndAfterEach - -import org.apache.spark.shuffle.IndexShuffleBlockResolver -import org.apache.spark.storage._ -import org.apache.spark.util.Utils -import org.apache.spark.{SparkConf, SparkFunSuite} - - -class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEach { - - @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _ - @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _ - - private var tempDir: File = _ - private val conf: SparkConf = new SparkConf(loadDefaults = false) - - override def beforeEach(): Unit = { - super.beforeEach() - tempDir = Utils.createTempDir() - MockitoAnnotations.initMocks(this) - - when(blockManager.diskBlockManager).thenReturn(diskBlockManager) - when(diskBlockManager.getFile(any[BlockId])).thenAnswer( - new Answer[File] { - override def answer(invocation: InvocationOnMock): File = { - new File(tempDir, invocation.getArguments.head.toString) - } - }) - } - - override def afterEach(): Unit = { - try { - Utils.deleteRecursively(tempDir) - } finally { - super.afterEach() - } - } - - test("commit shuffle files multiple times") { - val lengths = Array[Long](10, 0, 20) - val resolver = new IndexShuffleBlockResolver(conf, blockManager) - val dataTmp = File.createTempFile("shuffle", null, tempDir) - val out = new FileOutputStream(dataTmp) - out.write(new Array[Byte](30)) - out.close() - resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp) - - val dataFile = resolver.getDataFile(1, 2) - assert(dataFile.exists()) - assert(dataFile.length() === 30) - assert(!dataTmp.exists()) - - val dataTmp2 = File.createTempFile("shuffle", null, tempDir) - val out2 = new FileOutputStream(dataTmp2) - val lengths2 = new Array[Long](3) - out2.write(Array[Byte](1)) - out2.write(new Array[Byte](29)) - out2.close() - resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) - assert(lengths2.toSeq === lengths.toSeq) - assert(dataFile.exists()) - assert(dataFile.length() === 30) - assert(!dataTmp2.exists()) - - // The dataFile should be the previous one - val in = new FileInputStream(dataFile) - val firstByte = new Array[Byte](1) - in.read(firstByte) - assert(firstByte(0) === 0) - - // remove data file - dataFile.delete() - - val dataTmp3 = File.createTempFile("shuffle", null, tempDir) - val out3 = new FileOutputStream(dataTmp3) - val lengths3 = Array[Long](10, 10, 15) - out3.write(Array[Byte](2)) - out3.write(new Array[Byte](34)) - out3.close() - resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3) - assert(lengths3.toSeq != lengths.toSeq) - assert(dataFile.exists()) - assert(dataFile.length() === 35) - assert(!dataTmp2.exists()) - - // The dataFile should be the previous one - val in2 = new FileInputStream(dataFile) - val firstByte2 = new Array[Byte](1) - in2.read(firstByte2) - assert(firstByte2(0) === 2) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/7e15044d/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala new file mode 100644 index 0000000..d21ce73 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{File, FileInputStream, FileOutputStream} + +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Answers.RETURNS_SMART_NULLS +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.shuffle.IndexShuffleBlockResolver +import org.apache.spark.storage._ +import org.apache.spark.util.Utils + + +class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEach { + + @Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _ + @Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _ + + private var tempDir: File = _ + private val conf: SparkConf = new SparkConf(loadDefaults = false) + + override def beforeEach(): Unit = { + super.beforeEach() + tempDir = Utils.createTempDir() + MockitoAnnotations.initMocks(this) + + when(blockManager.diskBlockManager).thenReturn(diskBlockManager) + when(diskBlockManager.getFile(any[BlockId])).thenAnswer( + new Answer[File] { + override def answer(invocation: InvocationOnMock): File = { + new File(tempDir, invocation.getArguments.head.toString) + } + }) + } + + override def afterEach(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterEach() + } + } + + test("commit shuffle files multiple times") { + val resolver = new IndexShuffleBlockResolver(conf, blockManager) + val lengths = Array[Long](10, 0, 20) + val dataTmp = File.createTempFile("shuffle", null, tempDir) + val out = new FileOutputStream(dataTmp) + Utils.tryWithSafeFinally { + out.write(new Array[Byte](30)) + } { + out.close() + } + resolver.writeIndexFileAndCommit(1, 2, lengths, dataTmp) + + val dataFile = resolver.getDataFile(1, 2) + assert(dataFile.exists()) + assert(dataFile.length() === 30) + assert(!dataTmp.exists()) + + val lengths2 = new Array[Long](3) + val dataTmp2 = File.createTempFile("shuffle", null, tempDir) + val out2 = new FileOutputStream(dataTmp2) + Utils.tryWithSafeFinally { + out2.write(Array[Byte](1)) + out2.write(new Array[Byte](29)) + } { + out2.close() + } + resolver.writeIndexFileAndCommit(1, 2, lengths2, dataTmp2) + assert(lengths2.toSeq === lengths.toSeq) + assert(dataFile.exists()) + assert(dataFile.length() === 30) + assert(!dataTmp2.exists()) + + // The dataFile should be the previous one + val firstByte = new Array[Byte](1) + val in = new FileInputStream(dataFile) + Utils.tryWithSafeFinally { + in.read(firstByte) + } { + in.close() + } + assert(firstByte(0) === 0) + + // remove data file + dataFile.delete() + + val lengths3 = Array[Long](10, 10, 15) + val dataTmp3 = File.createTempFile("shuffle", null, tempDir) + val out3 = new FileOutputStream(dataTmp3) + Utils.tryWithSafeFinally { + out3.write(Array[Byte](2)) + out3.write(new Array[Byte](34)) + } { + out3.close() + } + resolver.writeIndexFileAndCommit(1, 2, lengths3, dataTmp3) + assert(lengths3.toSeq != lengths.toSeq) + assert(dataFile.exists()) + assert(dataFile.length() === 35) + assert(!dataTmp2.exists()) + + // The dataFile should be the previous one + val firstByte2 = new Array[Byte](1) + val in2 = new FileInputStream(dataFile) + Utils.tryWithSafeFinally { + in2.read(firstByte2) + } { + in2.close() + } + assert(firstByte2(0) === 2) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org