Repository: spark
Updated Branches:
  refs/heads/master 0f1d00a90 -> 7786f9cc0


[SPARK-11419][STREAMING] Parallel recovery for FileBasedWriteAheadLog + minor 
recovery tweaks

The support for closing WriteAheadLog files after writes was just merged in. 
Closing every file after a write is a very expensive operation as it creates 
many small files on S3. It's not necessary to enable it on HDFS anyway.

However, when you have many small files on S3, recovery takes very long. In 
addition, files start stacking up pretty quickly, and deletes may not be able 
to keep up, therefore deletes can also be parallelized.

This PR adds support for the two parallelization steps mentioned above, in 
addition to a couple more failures I encountered during recovery.

Author: Burak Yavuz <brk...@gmail.com>

Closes #9373 from brkyvz/par-recovery.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7786f9cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7786f9cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7786f9cc

Branch: refs/heads/master
Commit: 7786f9cc0790d27854a1e184f66a9b4df4d040a2
Parents: 0f1d00a
Author: Burak Yavuz <brk...@gmail.com>
Authored: Thu Nov 12 18:03:23 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Nov 12 18:03:23 2015 -0800

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      |  6 +-
 .../streaming/util/FileBasedWriteAheadLog.scala | 78 +++++++++++------
 .../FileBasedWriteAheadLogRandomReader.scala    |  2 +-
 .../util/FileBasedWriteAheadLogReader.scala     | 17 +++-
 .../apache/spark/streaming/util/HdfsUtils.scala | 24 +++++-
 .../streaming/ReceivedBlockTrackerSuite.scala   | 91 +++++++++++++++++++-
 .../streaming/util/WriteAheadLogSuite.scala     | 87 +++++++++++++++++--
 7 files changed, 268 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 2480b4e..1ed6fb0 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends 
Logging {
     if (eventLoop == null) return // scheduler has already been stopped
     logDebug("Stopping JobScheduler")
 
-    // First, stop receiving
-    receiverTracker.stop(processAllReceivedData)
+    if (receiverTracker != null) {
+      // First, stop receiving
+      receiverTracker.stop(processAllReceivedData)
+    }
 
     // Second, stop generating jobs. If it has to process all received data,
     // then this will wait for all the processing through JobScheduler to be 
over.

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index bc3f248..72705f1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,10 +17,12 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
+import java.util.concurrent.ThreadPoolExecutor
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ThreadPoolTaskSupport
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.language.postfixOps
 
@@ -57,8 +59,8 @@ private[streaming] class FileBasedWriteAheadLog(
   private val callerNameTag = getCallerName.map(c => s" for $c").getOrElse("")
 
   private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
-  implicit private val executionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
+  private val threadpool = 
ThreadUtils.newDaemonCachedThreadPool(threadpoolName, 20)
+  private val executionContext = 
ExecutionContext.fromExecutorService(threadpool)
   override protected val logName = s"WriteAheadLogManager $callerNameTag"
 
   private var currentLogPath: Option[String] = None
@@ -124,13 +126,19 @@ private[streaming] class FileBasedWriteAheadLog(
    */
   def readAll(): JIterator[ByteBuffer] = synchronized {
     val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
-    logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))
-
-    logFilesToRead.iterator.map { file =>
+    logInfo("Reading from the logs:\n" + logFilesToRead.mkString("\n"))
+    def readFile(file: String): Iterator[ByteBuffer] = {
       logDebug(s"Creating log reader with $file")
       val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
       CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, 
reader.close _)
-    }.flatten.asJava
+    }
+    if (!closeFileAfterWrite) {
+      logFilesToRead.iterator.map(readFile).flatten.asJava
+    } else {
+      // For performance gains, it makes sense to parallelize the recovery if
+      // closeFileAfterWrite = true
+      seqToParIterator(threadpool, logFilesToRead, readFile).asJava
+    }
   }
 
   /**
@@ -146,30 +154,33 @@ private[streaming] class FileBasedWriteAheadLog(
    * asynchronously.
    */
   def clean(threshTime: Long, waitForCompletion: Boolean): Unit = {
-    val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime 
} }
+    val oldLogFiles = synchronized {
+      val expiredLogs = pastLogs.filter { _.endTime < threshTime }
+      pastLogs --= expiredLogs
+      expiredLogs
+    }
     logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in 
$logDirectory " +
       s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
 
-    def deleteFiles() {
-      oldLogFiles.foreach { logInfo =>
-        try {
-          val path = new Path(logInfo.path)
-          val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
-          fs.delete(path, true)
-          synchronized { pastLogs -= logInfo }
-          logDebug(s"Cleared log file $logInfo")
-        } catch {
-          case ex: Exception =>
-            logWarning(s"Error clearing write ahead log file $logInfo", ex)
-        }
+    def deleteFile(walInfo: LogInfo): Unit = {
+      try {
+        val path = new Path(walInfo.path)
+        val fs = HdfsUtils.getFileSystemForPath(path, hadoopConf)
+        fs.delete(path, true)
+        logDebug(s"Cleared log file $walInfo")
+      } catch {
+        case ex: Exception =>
+          logWarning(s"Error clearing write ahead log file $walInfo", ex)
       }
       logInfo(s"Cleared log files in $logDirectory older than $threshTime")
     }
-    if (!executionContext.isShutdown) {
-      val f = Future { deleteFiles() }
-      if (waitForCompletion) {
-        import scala.concurrent.duration._
-        Await.ready(f, 1 second)
+    oldLogFiles.foreach { logInfo =>
+      if (!executionContext.isShutdown) {
+        val f = Future { deleteFile(logInfo) }(executionContext)
+        if (waitForCompletion) {
+          import scala.concurrent.duration._
+          Await.ready(f, 1 second)
+        }
       }
     }
   }
@@ -251,4 +262,23 @@ private[streaming] object FileBasedWriteAheadLog {
       }
     }.sortBy { _.startTime }
   }
+
+  /**
+   * This creates an iterator from a parallel collection, by keeping at most 
`n` objects in memory
+   * at any given time, where `n` is the size of the thread pool. This is 
crucial for use cases
+   * where we create `FileBasedWriteAheadLogReader`s during parallel recovery. 
We don't want to
+   * open up `k` streams altogether where `k` is the size of the Seq that we 
want to parallelize.
+   */
+  def seqToParIterator[I, O](
+      tpool: ThreadPoolExecutor,
+      source: Seq[I],
+      handler: I => Iterator[O]): Iterator[O] = {
+    val taskSupport = new ThreadPoolTaskSupport(tpool)
+    val groupSize = tpool.getMaximumPoolSize.max(8)
+    source.grouped(groupSize).flatMap { group =>
+      val parallelCollection = group.par
+      parallelCollection.tasksupport = taskSupport
+      parallelCollection.map(handler)
+    }.flatten
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
index f716822..56d4977 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogRandomReader.scala
@@ -30,7 +30,7 @@ private[streaming] class 
FileBasedWriteAheadLogRandomReader(path: String, conf:
   extends Closeable {
 
   private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
+  private var closed = (instream == null) // the file may be deleted as we're 
opening the stream
 
   def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized {
     assertOpen()

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
index c3bb59f..a375c07 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLogReader.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
-import java.io.{Closeable, EOFException}
+import java.io.{IOException, Closeable, EOFException}
 import java.nio.ByteBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -32,7 +32,7 @@ private[streaming] class FileBasedWriteAheadLogReader(path: 
String, conf: Config
   extends Iterator[ByteBuffer] with Closeable with Logging {
 
   private val instream = HdfsUtils.getInputStream(path, conf)
-  private var closed = false
+  private var closed = (instream == null) // the file may be deleted as we're 
opening the stream
   private var nextItem: Option[ByteBuffer] = None
 
   override def hasNext: Boolean = synchronized {
@@ -55,6 +55,19 @@ private[streaming] class FileBasedWriteAheadLogReader(path: 
String, conf: Config
           logDebug("Error reading next item, EOF reached", e)
           close()
           false
+        case e: IOException =>
+          logWarning("Error while trying to read data. If the file was 
deleted, " +
+            "this should be okay.", e)
+          close()
+          if (HdfsUtils.checkFileExists(path, conf)) {
+            // If file exists, this could be a legitimate error
+            throw e
+          } else {
+            // File was deleted. This can occur when the daemon cleanup thread 
takes time to
+            // delete the file during recovery.
+            false
+          }
+
         case e: Exception =>
           logWarning("Error while trying to read data from HDFS.", e)
           close()

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index f60688f..13a765d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.streaming.util
 
+import java.io.IOException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
@@ -42,8 +44,19 @@ private[streaming] object HdfsUtils {
   def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
     val dfsPath = new Path(path)
     val dfs = getFileSystemForPath(dfsPath, conf)
-    val instream = dfs.open(dfsPath)
-    instream
+    if (dfs.isFile(dfsPath)) {
+      try {
+        dfs.open(dfsPath)
+      } catch {
+        case e: IOException =>
+          // If we are really unlucky, the file may be deleted as we're 
opening the stream.
+          // This can happen as clean up is performed by daemon threads that 
may be left over from
+          // previous runs.
+          if (!dfs.isFile(dfsPath)) null else throw e
+      }
+    } else {
+      null
+    }
   }
 
   def checkState(state: Boolean, errorMsg: => String) {
@@ -71,4 +84,11 @@ private[streaming] object HdfsUtils {
       case _ => fs
     }
   }
+
+  /** Check if the file exists at the given path. */
+  def checkFileExists(path: String, conf: Configuration): Boolean = {
+    val hdpPath = new Path(path)
+    val fs = getFileSystemForPath(hdpPath, conf)
+    fs.isFile(hdpPath)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index f793a12..7db17ab 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.streaming
 
 import java.io.File
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.{WriteAheadLogUtils, 
FileBasedWriteAheadLogReader}
+import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
@@ -207,6 +208,75 @@ class ReceivedBlockTrackerSuite
     tracker1.isWriteAheadLogEnabled should be (false)
   }
 
+  test("parallel file deletion in FileBasedWriteAheadLog is robust to deletion 
error") {
+    conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
+    require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) 
=== 1)
+
+    val addBlocks = generateBlockInfos()
+    val batch1 = addBlocks.slice(0, 1)
+    val batch2 = addBlocks.slice(1, 3)
+    val batch3 = addBlocks.slice(3, addBlocks.length)
+
+    assert(getWriteAheadLogFiles().length === 0)
+
+    // list of timestamps for files
+    val t = Seq.tabulate(5)(i => i * 1000)
+
+    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+    assert(getWriteAheadLogFiles().length === 1)
+
+    // The goal is to create several log files which should have been cleaned 
up.
+    // If we face any issue during recovery, because these old files exist, 
then we need to make
+    // deletion more robust rather than a parallelized operation where we fire 
and forget
+    val batch1Allocation = createBatchAllocation(t(1), batch1)
+    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) 
:+ batch1Allocation)
+
+    writeEventsManually(getLogFileName(t(2)), Seq(createBatchCleanup(t(1))))
+
+    val batch2Allocation = createBatchAllocation(t(3), batch2)
+    writeEventsManually(getLogFileName(t(3)), batch2.map(BlockAdditionEvent) 
:+ batch2Allocation)
+
+    writeEventsManually(getLogFileName(t(4)), batch3.map(BlockAdditionEvent))
+
+    // We should have 5 different log files as we called `writeEventsManually` 
with 5 different
+    // timestamps
+    assert(getWriteAheadLogFiles().length === 5)
+
+    // Create the tracker to recover from the log files. We're going to ask 
the tracker to clean
+    // things up, and then we're going to rewrite that data, and recover using 
a different tracker.
+    // They should have identical data no matter what
+    val tracker = createTracker(recoverFromWriteAheadLog = true, clock = new 
ManualClock(t(4)))
+
+    def compareTrackers(base: ReceivedBlockTracker, subject: 
ReceivedBlockTracker): Unit = {
+      subject.getBlocksOfBatchAndStream(t(3), streamId) should be(
+        base.getBlocksOfBatchAndStream(t(3), streamId))
+      subject.getBlocksOfBatchAndStream(t(1), streamId) should be(
+        base.getBlocksOfBatchAndStream(t(1), streamId))
+      subject.getBlocksOfBatchAndStream(t(0), streamId) should be(Nil)
+    }
+
+    // ask the tracker to clean up some old files
+    tracker.cleanupOldBatches(t(3), waitForCompletion = true)
+    assert(getWriteAheadLogFiles().length === 3)
+
+    val tracker2 = createTracker(recoverFromWriteAheadLog = true, clock = new 
ManualClock(t(4)))
+    compareTrackers(tracker, tracker2)
+
+    // rewrite first file
+    writeEventsManually(getLogFileName(t(0)), Seq(createBatchCleanup(t(0))))
+    assert(getWriteAheadLogFiles().length === 4)
+    // make sure trackers are consistent
+    val tracker3 = createTracker(recoverFromWriteAheadLog = true, clock = new 
ManualClock(t(4)))
+    compareTrackers(tracker, tracker3)
+
+    // rewrite second file
+    writeEventsManually(getLogFileName(t(1)), batch1.map(BlockAdditionEvent) 
:+ batch1Allocation)
+    assert(getWriteAheadLogFiles().length === 5)
+    // make sure trackers are consistent
+    val tracker4 = createTracker(recoverFromWriteAheadLog = true, clock = new 
ManualClock(t(4)))
+    compareTrackers(tracker, tracker4)
+  }
+
   /**
    * Create tracker object with the optional provided clock. Use fake clock if 
you
    * want to control time by manually incrementing it to test log clean.
@@ -228,11 +298,30 @@ class ReceivedBlockTrackerSuite
       BlockManagerBasedStoreResult(StreamBlockId(streamId, 
math.abs(Random.nextInt)), Some(0L))))
   }
 
+  /**
+   * Write received block tracker events to a file manually.
+   */
+  def writeEventsManually(filePath: String, events: 
Seq[ReceivedBlockTrackerLogEvent]): Unit = {
+    val writer = HdfsUtils.getOutputStream(filePath, hadoopConf)
+    events.foreach { event =>
+      val bytes = Utils.serialize(event)
+      writer.writeInt(bytes.size)
+      writer.write(bytes)
+    }
+    writer.close()
+  }
+
   /** Get all the data written in the given write ahead log file. */
   def getWrittenLogData(logFile: String): Seq[ReceivedBlockTrackerLogEvent] = {
     getWrittenLogData(Seq(logFile))
   }
 
+  /** Get the log file name for the given log start time. */
+  def getLogFileName(time: Long, rollingIntervalSecs: Int = 1): String = {
+    checkpointDirectory.toString + File.separator + "receivedBlockMetadata" +
+      File.separator + s"log-$time-${time + rollingIntervalSecs * 1000}"
+  }
+
   /**
    * Get all the data written in the given write ahead log files. By default, 
it will read all
    * files in the test log directory.

http://git-wip-us.apache.org/repos/asf/spark/blob/7786f9cc/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 9e13f25..4273fd7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.streaming.util
 import java.io._
 import java.nio.ByteBuffer
 import java.util.{Iterator => JIterator}
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -32,15 +33,13 @@ import org.apache.hadoop.fs.Path
 import org.mockito.Matchers.{eq => meq}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
-import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
 import org.scalatest.concurrent.Eventually
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.{PrivateMethodTester, BeforeAndAfterEach, BeforeAndAfter}
 import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark.streaming.scheduler._
-import org.apache.spark.util.{ThreadUtils, ManualClock, Utils}
+import org.apache.spark.util.{CompletionIterator, ThreadUtils, ManualClock, 
Utils}
 import org.apache.spark.{SparkConf, SparkFunSuite}
 
 /** Common tests for WriteAheadLogs that we would like to test with different 
configurations. */
@@ -198,6 +197,64 @@ class FileBasedWriteAheadLogSuite
 
   import WriteAheadLogSuite._
 
+  test("FileBasedWriteAheadLog - seqToParIterator") {
+    /*
+     If the setting `closeFileAfterWrite` is enabled, we start generating a 
very large number of
+     files. This causes recovery to take a very long time. In order to make it 
quicker, we
+     parallelized the reading of these files. This test makes sure that we 
limit the number of
+     open files to the size of the number of threads in our thread pool rather 
than the size of
+     the list of files.
+     */
+    val numThreads = 8
+    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"wal-test-thread-pool")
+    class GetMaxCounter {
+      private val value = new AtomicInteger()
+      @volatile private var max: Int = 0
+      def increment(): Unit = synchronized {
+        val atInstant = value.incrementAndGet()
+        if (atInstant > max) max = atInstant
+      }
+      def decrement(): Unit = synchronized { value.decrementAndGet() }
+      def get(): Int = synchronized { value.get() }
+      def getMax(): Int = synchronized { max }
+    }
+    try {
+      // If Jenkins is slow, we may not have a chance to run many threads 
simultaneously. Having
+      // a latch will make sure that all the threads can be launched 
altogether.
+      val latch = new CountDownLatch(1)
+      val testSeq = 1 to 1000
+      val counter = new GetMaxCounter()
+      def handle(value: Int): Iterator[Int] = {
+        new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
+          counter.increment()
+          // block so that other threads also launch
+          latch.await(10, TimeUnit.SECONDS)
+          override def completion() { counter.decrement() }
+        }
+      }
+      @volatile var collected: Seq[Int] = Nil
+      val t = new Thread() {
+        override def run() {
+          // run the calculation on a separate thread so that we can release 
the latch
+          val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, 
Int](tpool, testSeq, handle)
+          collected = iterator.toSeq
+        }
+      }
+      t.start()
+      eventually(Eventually.timeout(10.seconds)) {
+        // make sure we are doing a parallel computation!
+        assert(counter.getMax() > 1)
+      }
+      latch.countDown()
+      t.join(10000)
+      assert(collected === testSeq)
+      // make sure we didn't open too many Iterators
+      assert(counter.getMax() <= numThreads)
+    } finally {
+      tpool.shutdownNow()
+    }
+  }
+
   test("FileBasedWriteAheadLogWriter - writing data") {
     val dataToWrite = generateRandomData()
     val segments = writeDataUsingWriter(testFile, dataToWrite)
@@ -259,6 +316,26 @@ class FileBasedWriteAheadLogSuite
     assert(readDataUsingReader(testFile) === (dataToWrite.dropRight(1)))
   }
 
+  test("FileBasedWriteAheadLogReader - handles errors when file doesn't 
exist") {
+    // Write data manually for testing the sequential reader
+    val dataToWrite = generateRandomData()
+    writeDataUsingWriter(testFile, dataToWrite)
+    val tFile = new File(testFile)
+    assert(tFile.exists())
+    // Verify the data can be read and is same as the one correctly written
+    assert(readDataUsingReader(testFile) === dataToWrite)
+
+    tFile.delete()
+    assert(!tFile.exists())
+
+    val reader = new FileBasedWriteAheadLogReader(testFile, hadoopConf)
+    assert(!reader.hasNext)
+    reader.close()
+
+    // Verify that no exception is thrown if file doesn't exist
+    assert(readDataUsingReader(testFile) === Nil)
+  }
+
   test("FileBasedWriteAheadLogRandomReader - reading data using random 
reader") {
     // Write data manually for testing the random reader
     val writtenData = generateRandomData()
@@ -581,7 +658,7 @@ object WriteAheadLogSuite {
       closeFileAfterWrite: Boolean,
       allowBatching: Boolean): Seq[String] = {
     val wal = createWriteAheadLog(logDirectory, closeFileAfterWrite, 
allowBatching)
-    val data = wal.readAll().asScala.map(byteBufferToString).toSeq
+    val data = wal.readAll().asScala.map(byteBufferToString).toArray
     wal.close()
     data
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to