Repository: spark
Updated Branches:
  refs/heads/master 7394e7ade -> ecc6eb50a


[SPARK-7315] [STREAMING] [TEST] Fix flaky WALBackedBlockRDDSuite

`FileUtils.getTempDirectoryPath()` path may or may not exist. We want to make 
sure that it does not exist.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #5853 from tdas/SPARK-7315 and squashes the following commits:

141afd5 [Tathagata Das] Removed use of FileUtils
b08d4f1 [Tathagata Das] Fix flaky WALBackedBlockRDDSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecc6eb50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecc6eb50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecc6eb50

Branch: refs/heads/master
Commit: ecc6eb50a59172dc132bd8f97957734f6f009024
Parents: 7394e7a
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Sat May 2 01:53:14 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sat May 2 01:53:14 2015 -0700

----------------------------------------------------------------------
 .../streaming/rdd/WriteAheadLogBackedBlockRDD.scala     | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ecc6eb50/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index ebdf418..f4c8046 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -16,13 +16,13 @@
  */
 package org.apache.spark.streaming.rdd
 
+import java.io.File
 import java.nio.ByteBuffer
+import java.util.UUID
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import org.apache.commons.io.FileUtils
-
 import org.apache.spark._
 import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
@@ -108,9 +108,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
           // writing log data. However, the directory is not needed if data 
needs to be read, hence
           // a dummy path is provided to satisfy the method parameter 
requirements.
           // FileBasedWriteAheadLog will not create any file or directory at 
that path.
-          val dummyDirectory = FileUtils.getTempDirectoryPath()
+          // FileBasedWriteAheadLog will not create any file or directory at 
that path. Also,
+          // this dummy directory should not already exist otherwise the WAL 
will try to recover
+          // past events from the directory and throw errors.
+          val nonExistentDirectory = new File(
+            System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString).getAbsolutePath
           writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
-            SparkEnv.get.conf, dummyDirectory, hadoopConf)
+            SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
           dataRead = writeAheadLog.read(partition.walRecordHandle)
         } catch {
           case NonFatal(e) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to