Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1b3db967e -> 72da2a21f


[SPARK-8414] Ensure context cleaner periodic cleanups

Garbage collection triggers cleanups. If the driver JVM is huge and there is 
little memory pressure, we may never clean up shuffle files on executors. This 
is a problem for long-running applications (e.g. streaming).

Author: Andrew Or <and...@databricks.com>

Closes #10070 from andrewor14/periodic-gc.

(cherry picked from commit 1ce4adf55b535518c2e63917a827fac1f2df4e8e)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72da2a21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72da2a21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72da2a21

Branch: refs/heads/branch-1.6
Commit: 72da2a21f0940b97757ace5975535e559d627688
Parents: 1b3db96
Author: Andrew Or <and...@databricks.com>
Authored: Tue Dec 1 19:36:34 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Dec 1 19:36:47 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala | 21 +++++++++++++++++++-
 1 file changed, 20 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72da2a21/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index d23c153..bc73253 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -18,12 +18,13 @@
 package org.apache.spark
 
 import java.lang.ref.{ReferenceQueue, WeakReference}
+import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
 
   private val cleaningThread = new Thread() { override def run() { 
keepCleaning() }}
 
+  private val periodicGCService: ScheduledExecutorService =
+    
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
+
+  /**
+   * How often to trigger a garbage collection in this JVM.
+   *
+   * This context cleaner triggers cleanups only when weak references are 
garbage collected.
+   * In long-running applications with large driver JVMs, where there is 
little memory pressure
+   * on the driver, this may happen very occasionally or not at all. Not 
cleaning at all may
+   * lead to executors running out of disk space after a while.
+   */
+  private val periodicGCInterval =
+    sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
+
   /**
    * Whether the cleaning thread will block on cleanup tasks (other than 
shuffle, which
    * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` 
parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     cleaningThread.setDaemon(true)
     cleaningThread.setName("Spark Context Cleaner")
     cleaningThread.start()
+    periodicGCService.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit = System.gc()
+    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
   }
 
   /**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
       cleaningThread.interrupt()
     }
     cleaningThread.join()
+    periodicGCService.shutdown()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to