Repository: spark
Updated Branches:
  refs/heads/master 10094a523 -> 8942b522d


[SPARK-3562]Periodic cleanup event logs

Author: xukun 00228947 <xukun...@huawei.com>

Closes #4214 from viper-kun/cleaneventlog and squashes the following commits:

7a5b9c5 [xukun 00228947] fix issue
31674ee [xukun 00228947] fix issue
6e3d06b [xukun 00228947] fix issue
373f3b9 [xukun 00228947] fix issue
71782b5 [xukun 00228947] fix issue
5b45035 [xukun 00228947] fix issue
70c28d6 [xukun 00228947] fix issues
adcfe86 [xukun 00228947] Periodic cleanup event logs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8942b522
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8942b522
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8942b522

Branch: refs/heads/master
Commit: 8942b522d8a3269a2a357e3a274ed4b3e66ebdde
Parents: 10094a5
Author: xukun 00228947 <xukun...@huawei.com>
Authored: Thu Feb 26 13:24:00 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Feb 26 13:24:00 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |   8 +-
 .../deploy/history/FsHistoryProvider.scala      | 112 +++++++++++++------
 docs/monitoring.md                              |  25 ++++-
 3 files changed, 110 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8942b522/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0dbd261..0f4922a 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging {
       DeprecatedConfig("spark.files.userClassPathFirst", 
"spark.executor.userClassPathFirst",
         "1.3"),
       DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
-        "Use spark.{driver,executor}.userClassPathFirst instead."))
+        "Use spark.{driver,executor}.userClassPathFirst instead."),
+      DeprecatedConfig("spark.history.fs.updateInterval",
+        "spark.history.fs.update.interval.seconds",
+        "1.3", "Use spark.history.fs.update.interval.seconds instead"),
+      DeprecatedConfig("spark.history.updateInterval",
+        "spark.history.fs.update.interval.seconds",
+        "1.3", "Use spark.history.fs.update.interval.seconds instead"))
     configs.map { x => (x.oldName, x) }.toMap
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8942b522/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 885fa0f..1aaa7b7 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{BufferedInputStream, FileNotFoundException, InputStream}
+import java.io.{IOException, BufferedInputStream, FileNotFoundException, 
InputStream}
+import java.util.concurrent.{Executors, TimeUnit}
 
 import scala.collection.mutable
+import scala.concurrent.duration.Duration
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
 
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.fs.permission.AccessControlException
@@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
   private val NOT_STARTED = "<Not Started>"
 
   // Interval between each check for event log updates
-  private val UPDATE_INTERVAL_MS = 
conf.getInt("spark.history.fs.updateInterval",
-    conf.getInt("spark.history.updateInterval", 10)) * 1000
+  private val UPDATE_INTERVAL_MS = 
conf.getOption("spark.history.fs.update.interval.seconds")
+    
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.fs.updateInterval",
 true)))
+    
.orElse(conf.getOption(SparkConf.translateConfKey("spark.history.updateInterval",
 true)))
+    .map(_.toInt)
+    .getOrElse(10) * 1000
+
+  // Interval between each cleaner checks for event logs to delete
+  private val CLEAN_INTERVAL_MS = 
conf.getLong("spark.history.fs.cleaner.interval.seconds",
+    DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S) * 1000
 
   private val logDir = conf.getOption("spark.history.fs.logDirectory")
     .map { d => Utils.resolveURI(d).toString }
@@ -53,8 +64,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
 
   private val fs = Utils.getHadoopFileSystem(logDir, 
SparkHadoopUtil.get.newConfiguration(conf))
 
-  // A timestamp of when the disk was last accessed to check for log updates
-  private var lastLogCheckTimeMs = -1L
+  // Used by check event thread and clean log thread.
+  // Scheduled thread pool size must be one, otherwise it will have concurrent 
issues about fs
+  // and applications between check task and clean task.
+  private val pool = Executors.newScheduledThreadPool(1, new 
ThreadFactoryBuilder()
+    .setNameFormat("spark-history-task-%d").setDaemon(true).build())
 
   // The modification time of the newest log detected during the last scan. 
This is used
   // to ignore logs that are older during subsequent scans, to avoid 
processing data that
@@ -73,25 +87,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
   private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
 
   /**
-   * A background thread that periodically checks for event log updates on 
disk.
-   *
-   * If a log check is invoked manually in the middle of a period, this thread 
re-adjusts the
-   * time at which it performs the next log check to maintain the same period 
as before.
-   *
-   * TODO: Add a mechanism to update manually.
+   * Return a runnable that performs the given operation on the event logs.
+   * This operation is expected to be executed periodically.
    */
-  private val logCheckingThread = new Thread("LogCheckingThread") {
-    override def run() = Utils.logUncaughtExceptions {
-      while (true) {
-        val now = getMonotonicTimeMs()
-        if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
-          Thread.sleep(UPDATE_INTERVAL_MS)
-        } else {
-          // If the user has manually checked for logs recently, wait until
-          // UPDATE_INTERVAL_MS after the last check time
-          Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
-        }
-        checkForLogs()
+  private def getRunner(operateFun: () => Unit): Runnable = {
+    new Runnable() {
+      override def run() = Utils.logUncaughtExceptions {
+        operateFun()
       }
     }
   }
@@ -113,12 +115,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
         "Logging directory specified is not a directory: %s".format(logDir))
     }
 
-    checkForLogs()
-
     // Disable the background thread during tests.
     if (!conf.contains("spark.testing")) {
-      logCheckingThread.setDaemon(true)
-      logCheckingThread.start()
+      // A task that periodically checks for event log updates on disk.
+      pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS,
+        TimeUnit.MILLISECONDS)
+
+      if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
+        // A task that periodically cleans event logs on disk.
+        pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_MS,
+          TimeUnit.MILLISECONDS)
+      }
     }
   }
 
@@ -163,9 +170,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
    * applications that haven't been updated since last time the logs were 
checked.
    */
   private[history] def checkForLogs(): Unit = {
-    lastLogCheckTimeMs = getMonotonicTimeMs()
-    logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
-
     try {
       var newLastModifiedTime = lastModifiedTime
       val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
@@ -231,6 +235,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
   }
 
   /**
+   * Delete event logs from the log directory according to the clean policy 
defined by the user.
+   */
+  private def cleanLogs(): Unit = {
+    try {
+      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
+        .getOrElse(Seq[FileStatus]())
+      val maxAge = conf.getLong("spark.history.fs.cleaner.maxAge.seconds",
+        DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000
+
+      val now = System.currentTimeMillis()
+      val appsToRetain = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
+
+      applications.values.foreach { info =>
+        if (now - info.lastUpdated <= maxAge) {
+          appsToRetain += (info.id -> info)
+        }
+      }
+
+      applications = appsToRetain
+
+      // Scan all logs from the log directory.
+      // Only directories older than the specified max age will be deleted
+      statusList.foreach { dir =>
+        try {
+          if (now - dir.getModificationTime() > maxAge) {
+            // if path is a directory and set to  true,
+            // the directory is deleted else throws an exception
+            fs.delete(dir.getPath, true)
+          }
+        } catch {
+          case t: IOException => logError(s"IOException in cleaning logs of 
$dir", t)
+        }
+      }
+    } catch {
+      case t: Exception => logError("Exception in cleaning logs", t)
+    }
+  }
+
+  /**
    * Comparison function that defines the sort order for the application 
listing.
    *
    * @return Whether `i1` should precede `i2`.
@@ -336,9 +379,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     }
   }
 
-  /** Returns the system's mononotically increasing time. */
-  private def getMonotonicTimeMs(): Long = System.nanoTime() / (1000 * 1000)
-
   /**
    * Return true when the application has completed.
    */
@@ -354,6 +394,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
 
 private object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  // One day
+  val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, 
TimeUnit.DAYS).toSeconds
+
+  // One week
+  val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds
 }
 
 private class FsApplicationHistoryInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/8942b522/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 009a344..37ede47 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -86,7 +86,7 @@ follows:
     </td>
   </tr>
   <tr>
-    <td>spark.history.fs.updateInterval</td>
+    <td>spark.history.fs.update.interval.seconds</td>
     <td>10</td>
     <td>
       The period, in seconds, at which information displayed by this history 
server is updated.
@@ -145,6 +145,29 @@ follows:
       If disabled, no access control checks are made. 
     </td>
   </tr>
+  <tr>
+    <td>spark.history.fs.cleaner.enabled</td>
+    <td>false</td>
+    <td>
+      Specifies whether the History Server should periodically clean up event 
logs from storage.
+    </td>
+  </tr>
+  <tr>
+    <td>spark.history.fs.cleaner.interval.seconds</td>
+    <td>86400</td>
+    <td>
+      How often the job history cleaner checks for files to delete, in 
seconds. Defaults to 86400 (one day).
+      Files are only deleted if they are older than 
spark.history.fs.cleaner.maxAge.seconds.
+    </td>
+  </tr>
+  <tr>
+    <td>spark.history.fs.cleaner.maxAge.seconds</td>
+    <td>3600 * 24 * 7</td>
+    <td>
+      Job history files older than this many seconds will be deleted when the 
history cleaner runs.
+      Defaults to 3600 * 24 * 7 (1 week).
+    </td>
+  </tr>
 </table>
 
 Note that in all of these UIs, the tables are sortable by clicking their 
headers,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to