Repository: spark
Updated Branches:
  refs/heads/master 8310c0741 -> 6c4b9f4be


[SPARK-16395][STREAMING] Fail if too many CheckpointWriteHandlers are queued up 
in the fixed thread pool

## What changes were proposed in this pull request?

Begin failing if checkpoint writes will likely keep up with storage's ability 
to write them, to fail fast instead of slowly filling memory

## How was this patch tested?

Jenkins tests

Author: Sean Owen <so...@cloudera.com>

Closes #14152 from srowen/SPARK-16395.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c4b9f4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c4b9f4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c4b9f4b

Branch: refs/heads/master
Commit: 6c4b9f4be6b429197c6a53f937a82c2ac5866d65
Parents: 8310c07
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jul 19 12:10:24 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 19 12:10:24 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/Checkpoint.scala  | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c4b9f4b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 0b11026..398fa65 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.streaming
 
 import java.io._
-import java.util.concurrent.Executors
-import java.util.concurrent.RejectedExecutionException
+import java.util.concurrent.{ArrayBlockingQueue, RejectedExecutionException,
+  ThreadPoolExecutor, TimeUnit}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -184,7 +184,14 @@ class CheckpointWriter(
     hadoopConf: Configuration
   ) extends Logging {
   val MAX_ATTEMPTS = 3
-  val executor = Executors.newFixedThreadPool(1)
+
+  // Single-thread executor which rejects executions when a large amount have 
queued up.
+  // This fails fast since this typically means the checkpoint store will 
never keep up, and
+  // will otherwise lead to filling memory with waiting payloads of byte[] to 
write.
+  val executor = new ThreadPoolExecutor(
+    1, 1,
+    0L, TimeUnit.MILLISECONDS,
+    new ArrayBlockingQueue[Runnable](1000))
   val compressionCodec = CompressionCodec.createCodec(conf)
   private var stopped = false
   @volatile private[this] var fs: FileSystem = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to