Repository: spark
Updated Branches:
  refs/heads/master 9dba5fb2b -> 4f030b9e8


[SPARK-11324][STREAMING] Flag for closing Write Ahead Logs after a write

Currently the Write Ahead Log in Spark Streaming flushes data as writes need to 
be made. S3 does not support flushing of data, data is written once the stream 
is actually closed.
In case of failure, the data for the last minute (default rolling interval) 
will not be properly written. Therefore we need a flag to close the stream 
after the write, so that we achieve read after write consistency.

cc tdas zsxwing

Author: Burak Yavuz <brk...@gmail.com>

Closes #9285 from brkyvz/caw-wal.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f030b9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f030b9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f030b9e

Branch: refs/heads/master
Commit: 4f030b9e82172659d250281782ac573cbd1438fc
Parents: 9dba5fb
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Oct 27 16:01:26 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Oct 27 16:01:26 2015 -0700

----------------------------------------------------------------------
 .../streaming/util/FileBasedWriteAheadLog.scala |  6 +++-
 .../streaming/util/WriteAheadLogUtils.scala     | 15 ++++++++-
 .../streaming/util/WriteAheadLogSuite.scala     | 32 +++++++++++++++-----
 3 files changed, 44 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f030b9e/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9f4a4d6..bc3f248 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -47,7 +47,8 @@ private[streaming] class FileBasedWriteAheadLog(
     logDirectory: String,
     hadoopConf: Configuration,
     rollingIntervalSecs: Int,
-    maxFailures: Int
+    maxFailures: Int,
+    closeFileAfterWrite: Boolean
   ) extends WriteAheadLog with Logging {
 
   import FileBasedWriteAheadLog._
@@ -80,6 +81,9 @@ private[streaming] class FileBasedWriteAheadLog(
     while (!succeeded && failures < maxFailures) {
       try {
         fileSegment = getLogWriter(time).write(byteBuffer)
+        if (closeFileAfterWrite) {
+          resetWriter()
+        }
         succeeded = true
       } catch {
         case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4f030b9e/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 7f6ff12..0ea970e 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -31,11 +31,15 @@ private[streaming] object WriteAheadLogUtils extends 
Logging {
   val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
     "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs"
   val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = 
"spark.streaming.receiver.writeAheadLog.maxFailures"
+  val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
+    "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite"
 
   val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class"
   val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY =
     "spark.streaming.driver.writeAheadLog.rollingIntervalSecs"
   val DRIVER_WAL_MAX_FAILURES_CONF_KEY = 
"spark.streaming.driver.writeAheadLog.maxFailures"
+  val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY =
+    "spark.streaming.driver.writeAheadLog.closeFileAfterWrite"
 
   val DEFAULT_ROLLING_INTERVAL_SECS = 60
   val DEFAULT_MAX_FAILURES = 3
@@ -60,6 +64,14 @@ private[streaming] object WriteAheadLogUtils extends Logging 
{
     }
   }
 
+  def shouldCloseFileAfterWrite(conf: SparkConf, isDriver: Boolean): Boolean = 
{
+    if (isDriver) {
+      conf.getBoolean(DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = 
false)
+    } else {
+      conf.getBoolean(RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY, defaultValue = 
false)
+    }
+  }
+
   /**
    * Create a WriteAheadLog for the driver. If configured with custom WAL 
class, it will try
    * to create instance of that class, otherwise it will create the default 
FileBasedWriteAheadLog.
@@ -113,7 +125,8 @@ private[streaming] object WriteAheadLogUtils extends 
Logging {
       }
     }.getOrElse {
       new FileBasedWriteAheadLog(sparkConf, fileWalLogDirectory, 
fileWalHadoopConf,
-        getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, 
isDriver))
+        getRollingIntervalSecs(sparkConf, isDriver), getMaxFailures(sparkConf, 
isDriver),
+        shouldCloseFileAfterWrite(sparkConf, isDriver))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f030b9e/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 5e49fd0..93ae41a 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -203,6 +203,21 @@ class WriteAheadLogSuite extends SparkFunSuite with 
BeforeAndAfter {
     assert(writtenData === dataToWrite)
   }
 
+  test("FileBasedWriteAheadLog - close after write flag") {
+    // Write data with rotation using WriteAheadLog class
+    val numFiles = 3
+    val dataToWrite = Seq.tabulate(numFiles)(_.toString)
+    // total advance time is less than 1000, therefore log shouldn't be 
rolled, but manually closed
+    writeDataUsingWriteAheadLog(testDir, dataToWrite, closeLog = false, 
clockAdvanceTime = 100,
+      closeFileAfterWrite = true)
+
+    // Read data manually to verify the written data
+    val logFiles = getLogFilesInDirectory(testDir)
+    assert(logFiles.size === numFiles)
+    val writtenData = logFiles.flatMap { file => readDataManually(file)}
+    assert(writtenData === dataToWrite)
+  }
+
   test("FileBasedWriteAheadLog - read rotating logs") {
     // Write data manually for testing reading through WriteAheadLog
     val writtenData = (1 to 10).map { i =>
@@ -296,8 +311,8 @@ class WriteAheadLogSuite extends SparkFunSuite with 
BeforeAndAfter {
     assert(!nonexistentTempPath.exists())
 
     val writtenSegment = writeDataManually(generateRandomData(), testFile)
-    val wal = new FileBasedWriteAheadLog(
-      new SparkConf(), tempDir.getAbsolutePath, new Configuration(), 1, 1)
+    val wal = new FileBasedWriteAheadLog(new SparkConf(), 
tempDir.getAbsolutePath,
+      new Configuration(), 1, 1, closeFileAfterWrite = false)
     assert(!nonexistentTempPath.exists(), "Directory created just by creating 
log object")
     wal.read(writtenSegment.head)
     assert(!nonexistentTempPath.exists(), "Directory created just by 
attempting to read segment")
@@ -356,14 +371,16 @@ object WriteAheadLogSuite {
       logDirectory: String,
       data: Seq[String],
       manualClock: ManualClock = new ManualClock,
-      closeLog: Boolean = true
-    ): FileBasedWriteAheadLog = {
+      closeLog: Boolean = true,
+      clockAdvanceTime: Int = 500,
+      closeFileAfterWrite: Boolean = false): FileBasedWriteAheadLog = {
     if (manualClock.getTimeMillis() < 100000) manualClock.setTime(10000)
-    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1)
+    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1,
+      closeFileAfterWrite)
 
     // Ensure that 500 does not get sorted after 2000, so put a high base 
value.
     data.foreach { item =>
-      manualClock.advance(500)
+      manualClock.advance(clockAdvanceTime)
       wal.write(item, manualClock.getTimeMillis())
     }
     if (closeLog) wal.close()
@@ -418,7 +435,8 @@ object WriteAheadLogSuite {
 
   /** Read all the data in the log file in a directory using the WriteAheadLog 
class. */
   def readDataUsingWriteAheadLog(logDirectory: String): Seq[String] = {
-    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1)
+    val wal = new FileBasedWriteAheadLog(new SparkConf(), logDirectory, 
hadoopConf, 1, 1,
+      closeFileAfterWrite = false)
     val data = wal.readAll().asScala.map(byteBufferToString).toSeq
     wal.close()
     data


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to