Repository: spark
Updated Branches:
  refs/heads/master 6b348d90f -> 26c1c56de


[SPARK-5522] Accelerate the Histroty Server start

When starting the history server, all the log files will be fetched and parsed 
in order to get the applications' meta data e.g. App Name, Start Time, 
Duration, etc. In our production cluster, there exist 2600 log files (160G) in 
HDFS and it costs 3 hours to restart the history server, which is a little bit 
too long for us.

It would be better, if the history server can show logs with missing 
information during start-up and fill the missing information after fetching and 
parsing a log file.

Author: guliangliang <guliangli...@qiyi.com>

Closes #4525 from marsishandsome/Spark5522 and squashes the following commits:

a865c11 [guliangliang] fix bug2
4340c2b [guliangliang] fix bug
af92a5a [guliangliang] [SPARK-5522] Accelerate the Histroty Server start


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26c1c56d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26c1c56d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26c1c56d

Branch: refs/heads/master
Commit: 26c1c56dea5d4160913bb65bb743aeb63fee3240
Parents: 6b348d9
Author: guliangliang <guliangli...@qiyi.com>
Authored: Mon Mar 2 15:33:23 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Mar 2 15:33:23 2015 -0800

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 115 ++++++++++++-------
 1 file changed, 74 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26c1c56d/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 3e3d6ff..c5fab1d 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -18,22 +18,23 @@
 package org.apache.spark.deploy.history
 
 import java.io.{IOException, BufferedInputStream, FileNotFoundException, 
InputStream}
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 
 import scala.collection.mutable
 import scala.concurrent.duration.Duration
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import com.google.common.util.concurrent.MoreExecutors
 import org.apache.hadoop.fs.permission.AccessControlException
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
 
 /**
  * A class that provides application history from event logs stored in the 
file system.
@@ -98,6 +99,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     }
   }
 
+  /**
+   * An Executor to fetch and parse log files.
+   */
+  private val replayExecutor: ExecutorService = {
+    if (!conf.contains("spark.testing")) {
+      
Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
+    } else {
+      MoreExecutors.sameThreadExecutor()
+    }
+  }
+
   initialize()
 
   private def initialize(): Unit = {
@@ -171,10 +183,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
    */
   private[history] def checkForLogs(): Unit = {
     try {
-      var newLastModifiedTime = lastModifiedTime
       val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
         .getOrElse(Seq[FileStatus]())
-      val logInfos = statusList
+      var newLastModifiedTime = lastModifiedTime
+      val logInfos: Seq[FileStatus] = statusList
         .filter { entry =>
           try {
             getModificationTime(entry).map { time =>
@@ -189,48 +201,69 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
               false
           }
         }
-        .flatMap { entry =>
-          try {
-            Some(replay(entry, new ReplayListenerBus()))
-          } catch {
-            case e: Exception =>
-              logError(s"Failed to load application log data from $entry.", e)
-              None
-          }
-        }
-        .sortWith(compareAppInfo)
+        .flatMap { entry => Some(entry) }
+        .sortWith { case (entry1, entry2) =>
+          val mod1 = getModificationTime(entry1).getOrElse(-1L)
+          val mod2 = getModificationTime(entry2).getOrElse(-1L)
+          mod1 >= mod2
+      }
+
+      logInfos.sliding(20, 20).foreach { batch =>
+        replayExecutor.submit(new Runnable {
+          override def run(): Unit = mergeApplicationListing(batch)
+        })
+      }
 
       lastModifiedTime = newLastModifiedTime
+    } catch {
+      case e: Exception => logError("Exception in checking for event log 
updates", e)
+    }
+  }
 
-      // When there are new logs, merge the new list with the existing one, 
maintaining
-      // the expected ordering (descending end time). Maintaining the order is 
important
-      // to avoid having to sort the list every time there is a request for 
the log list.
-      if (!logInfos.isEmpty) {
-        val newApps = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
-        def addIfAbsent(info: FsApplicationHistoryInfo) = {
-          if (!newApps.contains(info.id) ||
-              
newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
-              !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
-            newApps += (info.id -> info)
-          }
+  /**
+   * Replay the log files in the list and merge the list of old applications 
with new ones
+   */
+  private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
+    val bus = new ReplayListenerBus()
+    val newApps = logs.flatMap { fileStatus =>
+      try {
+        val res = replay(fileStatus, bus)
+        logInfo(s"Application log ${res.logPath} loaded successfully.")
+        Some(res)
+      } catch {
+        case e: Exception =>
+          logError(
+            s"Exception encountered when attempting to load application log 
${fileStatus.getPath}")
+          None
+      }
+    }.toSeq.sortWith(compareAppInfo)
+
+    // When there are new logs, merge the new list with the existing one, 
maintaining
+    // the expected ordering (descending end time). Maintaining the order is 
important
+    // to avoid having to sort the list every time there is a request for the 
log list.
+    if (newApps.nonEmpty) {
+      val mergedApps = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
+      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+        if (!mergedApps.contains(info.id) ||
+            
mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
+            !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
+          mergedApps += (info.id -> info)
         }
+      }
 
-        val newIterator = logInfos.iterator.buffered
-        val oldIterator = applications.values.iterator.buffered
-        while (newIterator.hasNext && oldIterator.hasNext) {
-          if (compareAppInfo(newIterator.head, oldIterator.head)) {
-            addIfAbsent(newIterator.next)
-          } else {
-            addIfAbsent(oldIterator.next)
-          }
+      val newIterator = newApps.iterator.buffered
+      val oldIterator = applications.values.iterator.buffered
+      while (newIterator.hasNext && oldIterator.hasNext) {
+        if (compareAppInfo(newIterator.head, oldIterator.head)) {
+          addIfAbsent(newIterator.next())
+        } else {
+          addIfAbsent(oldIterator.next())
         }
-        newIterator.foreach(addIfAbsent)
-        oldIterator.foreach(addIfAbsent)
-
-        applications = newApps
       }
-    } catch {
-      case e: Exception => logError("Exception in checking for event log 
updates", e)
+      newIterator.foreach(addIfAbsent)
+      oldIterator.foreach(addIfAbsent)
+
+      applications = mergedApps
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to