Repository: spark
Updated Branches:
  refs/heads/master b0c3fd34e -> de5e531d3


[SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default

Using batching on the driver for the WriteAheadLog should be an improvement for 
all environments and use cases. Users will be able to scale to much higher 
number of receivers with the BatchedWriteAheadLog. Therefore we should turn it 
on by default, and QA it in the QA period.

I've also added some tests to make sure the default configurations are correct 
regarding recent additions:
 - batching on by default
 - closeFileAfterWrite off by default
 - parallelRecovery off by default

Author: Burak Yavuz <brk...@gmail.com>

Closes #9695 from brkyvz/enable-batch-wal.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de5e531d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de5e531d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de5e531d

Branch: refs/heads/master
Commit: de5e531d337075fd849437e88846873bca8685e6
Parents: b0c3fd3
Author: Burak Yavuz <brk...@gmail.com>
Authored: Mon Nov 16 11:21:17 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 16 11:21:17 2015 -0800

----------------------------------------------------------------------
 .../streaming/util/WriteAheadLogUtils.scala     |  2 +-
 .../spark/streaming/JavaWriteAheadLogSuite.java |  1 +
 .../streaming/ReceivedBlockTrackerSuite.scala   |  9 ++++++--
 .../streaming/util/WriteAheadLogSuite.scala     | 24 +++++++++++++++++++-
 .../util/WriteAheadLogUtilsSuite.scala          | 19 +++++++++++++---
 5 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 731a369..7f9e2c9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
   }
 
   def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
-    isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
false)
+    isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
true)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index 175b8a4..09b5f8e 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
   public void testCustomWAL() {
     SparkConf conf = new SparkConf();
     conf.set("spark.streaming.driver.writeAheadLog.class", 
JavaWriteAheadLogSuite.class.getName());
+    conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
     WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, 
null);
 
     String data1 = "data1";

http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 7db17ab..081f5a1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
     : Seq[ReceivedBlockTrackerLogEvent] = {
     logFiles.flatMap {
       file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
-    }.map { byteBuffer =>
-      Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
+    }.flatMap { byteBuffer =>
+      val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, 
isDriver = true)) {
+        
Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
+      } else {
+        Array(byteBuffer)
+      }
+      validBuffer.map(b => 
Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
     }.toList
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 4273fd7..7f80d6e 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -20,7 +20,7 @@ import java.io._
 import java.nio.ByteBuffer
 import java.util.{Iterator => JIterator}
 import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
+import java.util.concurrent.{RejectedExecutionException, TimeUnit, 
CountDownLatch, ThreadPoolExecutor}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests(
     }
     assert(!nonexistentTempPath.exists(), "Directory created just by 
attempting to read segment")
   }
+
+  test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = 
false") {
+    // write some data
+    val writtenData = (1 to 10).map { i =>
+      val data = generateRandomData()
+      val file = testDir + s"/log-$i-$i"
+      writeDataManually(data, file, allowBatching)
+      data
+    }.flatten
+
+    val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
+    // create iterator but don't materialize it
+    val readData = wal.readAll().asScala.map(byteBufferToString)
+    wal.close()
+    if (closeFileAfterWrite) {
+      // the threadpool is shutdown by the wal.close call above, therefore we 
shouldn't be able
+      // to materialize the iterator with parallel recovery
+      intercept[RejectedExecutionException](readData.toArray)
+    } else {
+      assert(readData.toSeq === writtenData)
+    }
+  }
 }
 
 class FileBasedWriteAheadLogSuite

http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
index 9152728..bfc5b0c 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala
@@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
   test("log selection and creation") {
 
     val emptyConf = new SparkConf()  // no log configuration
-    assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
+    assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true)
     assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
 
     // Verify setting driver WAL class
     val driverWALConf = new 
SparkConf().set("spark.streaming.driver.writeAheadLog.class",
       classOf[MockWriteAheadLog0].getName())
-    assertDriverLogClass[MockWriteAheadLog0](driverWALConf)
+    assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true)
     assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf)
 
     // Verify setting receiver WAL class
     val receiverWALConf = new 
SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
       classOf[MockWriteAheadLog0].getName())
-    assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
+    assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = 
true)
     assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
 
     // Verify setting receiver WAL class with 1-arg constructor
@@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
     assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = 
true)
     assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
   }
+
+  test("batching is enabled by default in WriteAheadLog") {
+    val conf = new SparkConf()
+    assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true))
+    // batching is not valid for receiver WALs
+    assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false))
+  }
+
+  test("closeFileAfterWrite is disabled by default in WriteAheadLog") {
+    val conf = new SparkConf()
+    assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = 
true))
+    assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = 
false))
+  }
 }
 
 object WriteAheadLogUtilsSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to