spark git commit: [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default
Repository: spark Updated Branches: refs/heads/master b0c3fd34e -> de5e531d3 [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak Yavuz Closes #9695 from brkyvz/enable-batch-wal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de5e531d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de5e531d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de5e531d Branch: refs/heads/master Commit: de5e531d337075fd849437e88846873bca8685e6 Parents: b0c3fd3 Author: Burak Yavuz Authored: Mon Nov 16 11:21:17 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 11:21:17 2015 -0800 -- .../streaming/util/WriteAheadLogUtils.scala | 2 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 1 + .../streaming/ReceivedBlockTrackerSuite.scala | 9 ++-- .../streaming/util/WriteAheadLogSuite.scala | 24 +++- .../util/WriteAheadLogUtilsSuite.scala | 19 +--- 5 files changed, 48 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index 731a369..7f9e2c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { -isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false) +isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java -- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 175b8a4..09b5f8e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { public void testCustomWAL() { SparkConf conf = new SparkConf(); conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); +conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false"); WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17ab..081f5a1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq -}.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) +}.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { + Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) + } else { +Array(byteBuffer) + } + validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array())) }.toList } http://git-wip-us.ap
spark git commit: [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default
Repository: spark Updated Branches: refs/heads/branch-1.6 f14fb291d -> 38673d7e6 [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak Yavuz Closes #9695 from brkyvz/enable-batch-wal. (cherry picked from commit de5e531d337075fd849437e88846873bca8685e6) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38673d7e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38673d7e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38673d7e Branch: refs/heads/branch-1.6 Commit: 38673d7e6358622f240d7331b061cadb96f8409f Parents: f14fb29 Author: Burak Yavuz Authored: Mon Nov 16 11:21:17 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 11:21:27 2015 -0800 -- .../streaming/util/WriteAheadLogUtils.scala | 2 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 1 + .../streaming/ReceivedBlockTrackerSuite.scala | 9 ++-- .../streaming/util/WriteAheadLogSuite.scala | 24 +++- .../util/WriteAheadLogUtilsSuite.scala | 19 +--- 5 files changed, 48 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index 731a369..7f9e2c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { -isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false) +isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java -- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 175b8a4..09b5f8e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { public void testCustomWAL() { SparkConf conf = new SparkConf(); conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); +conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false"); WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17ab..081f5a1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq -}.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) +}.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { + Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) + } else { +Array(byteBuffer) + } + validBuffer.map(