Repository: spark
Updated Branches:
  refs/heads/master 9c5935d00 -> 74daf622d


[SPARK-20642][CORE] Store FsHistoryProvider listing data in a KVStore.

The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.

The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.

I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.

HistoryServerSuite was modified to not re-start the history server 
unnecessarily;
this makes the json validation tests run more quickly.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #18887 from vanzin/SPARK-20642.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74daf622
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74daf622
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74daf622

Branch: refs/heads/master
Commit: 74daf622de4e534d5a5929b424a6e836850eefad
Parents: 9c5935d
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Sep 27 20:33:41 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Sep 27 20:33:41 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/util/kvstore/LevelDB.java  |  33 +-
 .../spark/util/kvstore/LevelDBIterator.java     |  10 +
 core/pom.xml                                    |   5 +
 .../deploy/history/FsHistoryProvider.scala      | 659 ++++++++++---------
 .../apache/spark/deploy/history/config.scala    |  48 ++
 .../org/apache/spark/status/api/v1/api.scala    |  15 +-
 .../deploy/history/FsHistoryProviderSuite.scala |  71 +-
 .../deploy/history/HistoryServerSuite.scala     |  17 +-
 docs/monitoring.md                              |   9 +
 scalastyle-config.xml                           |   2 +-
 10 files changed, 536 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 310febc..ff48b15 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -213,17 +213,32 @@ public class LevelDB implements KVStore {
 
   @Override
   public void close() throws IOException {
-    DB _db = this._db.getAndSet(null);
-    if (_db == null) {
-      return;
+    synchronized (this._db) {
+      DB _db = this._db.getAndSet(null);
+      if (_db == null) {
+        return;
+      }
+
+      try {
+        _db.close();
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) {
+        throw new IOException(e.getMessage(), e);
+      }
     }
+  }
 
-    try {
-      _db.close();
-    } catch (IOException ioe) {
-      throw ioe;
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
+  /**
+   * Closes the given iterator if the DB is still open. Trying to close a JNI 
LevelDB handle
+   * with a closed DB can cause JVM crashes, so this ensures that situation 
does not happen.
+   */
+  void closeIterator(LevelDBIterator it) throws IOException {
+    synchronized (this._db) {
+      DB _db = this._db.get();
+      if (_db != null) {
+        it.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
----------------------------------------------------------------------
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
index a2181f3..b3ba76b 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java
@@ -191,6 +191,16 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
     }
   }
 
+  /**
+   * Because it's tricky to expose closeable iterators through many internal 
APIs, especially
+   * when Scala wrappers are used, this makes sure that, hopefully, the JNI 
resources held by
+   * the iterator will eventually be released.
+   */
+  @Override
+  protected void finalize() throws Throwable {
+    db.closeIterator(this);
+  }
+
   private byte[] loadNext() {
     if (count >= max) {
       return null;

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index da68abd..0966914 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -69,6 +69,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
+      <artifactId>spark-kvstore_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
       <artifactId>spark-network-common_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 910121e..3889dd0 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{FileNotFoundException, IOException, OutputStream}
-import java.util.UUID
-import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, 
Future, TimeUnit}
+import java.io.{File, FileNotFoundException, IOException}
+import java.util.{Date, UUID}
+import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.xml.Node
 
+import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import com.google.common.io.ByteStreams
 import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -35,11 +38,14 @@ import org.apache.hadoop.security.AccessControlException
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
+import org.apache.spark.status.api.v1
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
+import org.apache.spark.util.kvstore._
 
 /**
  * A class that provides application history from event logs stored in the 
file system.
@@ -50,11 +56,10 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  *
  * - New attempts are detected in [[checkForLogs]]: the log dir is scanned, 
and any
  * entries in the log dir whose modification time is greater than the last 
scan time
- * are considered new or updated. These are replayed to create a new 
[[FsApplicationAttemptInfo]]
- * entry and update or create a matching [[FsApplicationHistoryInfo]] element 
in the list
- * of applications.
+ * are considered new or updated. These are replayed to create a new attempt 
info entry
+ * and update or create a matching application info element in the list of 
applications.
  * - Updated attempts are also found in [[checkForLogs]] -- if the attempt's 
log file has grown, the
- * [[FsApplicationAttemptInfo]] is replaced by another one with a larger log 
size.
+ * attempt is replaced by another one with a larger log size.
  * - When [[updateProbe()]] is invoked to check if a loaded [[SparkUI]]
  * instance is out of date, the log size of the cached instance is checked 
against the app last
  * loaded by [[checkForLogs]].
@@ -78,6 +83,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     this(conf, new SystemClock())
   }
 
+  import config._
   import FsHistoryProvider._
 
   // Interval between safemode checks.
@@ -94,8 +100,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   private val NUM_PROCESSING_THREADS = 
conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
     Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
 
-  private val logDir = conf.getOption("spark.history.fs.logDirectory")
-    .getOrElse(DEFAULT_LOG_DIR)
+  private val logDir = conf.get(EVENT_LOG_DIR)
 
   private val HISTORY_UI_ACLS_ENABLE = 
conf.getBoolean("spark.history.ui.acls.enable", false)
   private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", 
"")
@@ -117,17 +122,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   // used for logging msgs (logs are re-scanned based on file size, rather 
than modtime)
   private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
 
-  // Mapping of application IDs to their metadata, in descending end time 
order. Apps are inserted
-  // into the map in order, so the LinkedHashMap maintains the correct 
ordering.
-  @volatile private var applications: mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]
-    = new mutable.LinkedHashMap()
+  private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
 
-  val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()
+  private val storePath = conf.get(LOCAL_STORE_DIR)
 
-  // List of application logs to be deleted by event log cleaner.
-  private var attemptsToClean = new 
mutable.ListBuffer[FsApplicationAttemptInfo]
+  // Visible for testing.
+  private[history] val listing: KVStore = storePath.map { path =>
+    val dbPath = new File(path, "listing.ldb")
 
-  private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
+    def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer())
+
+    try {
+      val db = openDB()
+      val meta = db.getMetadata(classOf[KVStoreMetadata])
+
+      if (meta == null) {
+        db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir))
+        db
+      } else if (meta.version != CURRENT_LISTING_VERSION || 
!logDir.equals(meta.logDir)) {
+        logInfo("Detected mismatched config in existing DB, deleting...")
+        db.close()
+        Utils.deleteRecursively(dbPath)
+        openDB()
+      } else {
+        db
+      }
+    } catch {
+      case _: UnsupportedStoreVersionException =>
+        logInfo("Detected incompatible DB versions, deleting...")
+        Utils.deleteRecursively(dbPath)
+        openDB()
+    }
+  }.getOrElse(new InMemoryStore())
 
   /**
    * Return a runnable that performs the given operation on the event logs.
@@ -231,10 +257,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     }
   }
 
-  override def getListing(): Iterator[FsApplicationHistoryInfo] = 
applications.values.iterator
+  override def getListing(): Iterator[ApplicationHistoryInfo] = {
+    // Return the listing in end time descending order.
+    listing.view(classOf[ApplicationInfoWrapper])
+      .index("endTime")
+      .reverse()
+      .iterator()
+      .asScala
+      .map(_.toAppHistoryInfo())
+  }
 
-  override def getApplicationInfo(appId: String): 
Option[FsApplicationHistoryInfo] = {
-    applications.get(appId)
+  override def getApplicationInfo(appId: String): 
Option[ApplicationHistoryInfo] = {
+    try {
+      Some(load(appId).toAppHistoryInfo())
+    } catch {
+      case e: NoSuchElementException =>
+        None
+    }
   }
 
   override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
@@ -243,42 +282,40 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   override def getAppUI(appId: String, attemptId: Option[String]): 
Option[LoadedAppUI] = {
     try {
-      applications.get(appId).flatMap { appInfo =>
-        appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt =>
+      val appInfo = load(appId)
+      appInfo.attempts
+        .find(_.info.attemptId == attemptId)
+        .map { attempt =>
           val replayBus = new ReplayListenerBus()
           val ui = {
             val conf = this.conf.clone()
             val appSecManager = new SecurityManager(conf)
-            SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.name,
-              HistoryServer.getAttemptURI(appId, attempt.attemptId),
-              Some(attempt.lastUpdated), attempt.startTime)
+            SparkUI.createHistoryUI(conf, replayBus, appSecManager, 
appInfo.info.name,
+              HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
+              Some(attempt.info.lastUpdated.getTime()), 
attempt.info.startTime.getTime())
             // Do not call ui.bind() to avoid creating a new server for each 
application
           }
 
           val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
 
           val appListener = replay(fileStatus, 
isApplicationCompleted(fileStatus), replayBus)
-
-          if (appListener.appId.isDefined) {
-            ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
-            ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
-            // make sure to set admin acls before view acls so they are 
properly picked up
-            val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + 
appListener.adminAcls.getOrElse("")
-            ui.getSecurityManager.setAdminAcls(adminAcls)
-            ui.getSecurityManager.setViewAcls(attempt.sparkUser, 
appListener.viewAcls.getOrElse(""))
-            val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
-              appListener.adminAclsGroups.getOrElse("")
-            ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
-            
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-            Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, 
attempt.fileSize)))
-          } else {
-            None
-          }
-
+          assert(appListener.appId.isDefined)
+          ui.appSparkVersion = appListener.appSparkVersion.getOrElse("")
+          ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE)
+          // make sure to set admin acls before view acls so they are properly 
picked up
+          val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + 
appListener.adminAcls.getOrElse("")
+          ui.getSecurityManager.setAdminAcls(adminAcls)
+          ui.getSecurityManager.setViewAcls(attempt.info.sparkUser,
+            appListener.viewAcls.getOrElse(""))
+          val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
+            appListener.adminAclsGroups.getOrElse("")
+          ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups)
+          
ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
+          LoadedAppUI(ui, () => updateProbe(appId, attemptId, 
attempt.fileSize))
         }
-      }
     } catch {
-      case e: FileNotFoundException => None
+      case _: FileNotFoundException => None
+      case _: NoSuchElementException => None
     }
   }
 
@@ -303,9 +340,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 
   override def stop(): Unit = {
-    if (initThread != null && initThread.isAlive()) {
-      initThread.interrupt()
-      initThread.join()
+    try {
+      if (initThread != null && initThread.isAlive()) {
+        initThread.interrupt()
+        initThread.join()
+      }
+    } finally {
+      listing.close()
     }
   }
 
@@ -318,25 +359,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     try {
       val newLastScanTime = getNewLastScanTime()
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-        .getOrElse(Seq.empty[FileStatus])
       // scan for modified applications, replay and merge them
-      val logInfos: Seq[FileStatus] = statusList
+      val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
         .filter { entry =>
-          val fileInfo = fileToAppInfo.get(entry.getPath())
-          val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L
           !entry.isDirectory() &&
             // FsHistoryProvider generates a hidden file which can't be read.  
Accidentally
             // reading a garbage file is safe, but we would log an error which 
can be scary to
             // the end-user.
             !entry.getPath().getName().startsWith(".") &&
-            prevFileSize < entry.getLen() &&
-            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+            SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) &&
+            recordedFileSize(entry.getPath()) < entry.getLen()
         }
-        .flatMap { entry => Some(entry) }
         .sortWith { case (entry1, entry2) =>
-          entry1.getModificationTime() >= entry2.getModificationTime()
-      }
+          entry1.getModificationTime() > entry2.getModificationTime()
+        }
 
       if (logInfos.nonEmpty) {
         logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
@@ -424,208 +460,105 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
       }
     }
 
-    applications.get(appId) match {
-      case Some(appInfo) =>
-        try {
-          // If no attempt is specified, or there is no attemptId for 
attempts, return all attempts
-          appInfo.attempts.filter { attempt =>
-            attempt.attemptId.isEmpty || attemptId.isEmpty || 
attempt.attemptId.get == attemptId.get
-          }.foreach { attempt =>
-            val logPath = new Path(logDir, attempt.logPath)
-            zipFileToStream(logPath, attempt.logPath, zipStream)
-          }
-        } finally {
-          zipStream.close()
+    val app = try {
+      load(appId)
+    } catch {
+      case _: NoSuchElementException =>
+        throw new SparkException(s"Logs for $appId not found.")
+    }
+
+    try {
+      // If no attempt is specified, or there is no attemptId for attempts, 
return all attempts
+      attemptId
+        .map { id => app.attempts.filter(_.info.attemptId == Some(id)) }
+        .getOrElse(app.attempts)
+        .map(_.logPath)
+        .foreach { log =>
+          zipFileToStream(new Path(logDir, log), log, zipStream)
         }
-      case None => throw new SparkException(s"Logs for $appId not found.")
+    } finally {
+      zipStream.close()
     }
   }
 
   /**
-   * Replay the log files in the list and merge the list of old applications 
with new ones
+   * Replay the given log file, saving the application in the listing db.
    */
   protected def mergeApplicationListing(fileStatus: FileStatus): Unit = {
-    val newAttempts = try {
-      val eventsFilter: ReplayEventsFilter = { eventString =>
-        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
-          eventString.startsWith(APPL_END_EVENT_PREFIX) ||
-          eventString.startsWith(LOG_START_EVENT_PREFIX)
-      }
-
-      val logPath = fileStatus.getPath()
-      val appCompleted = isApplicationCompleted(fileStatus)
-
-      // Use loading time as lastUpdated since some filesystems don't update 
modifiedTime
-      // each time file is updated. However use modifiedTime for completed 
jobs so lastUpdated
-      // won't change whenever HistoryServer restarts and reloads the file.
-      val lastUpdated = if (appCompleted) fileStatus.getModificationTime else 
clock.getTimeMillis()
-
-      val appListener = replay(fileStatus, appCompleted, new 
ReplayListenerBus(), eventsFilter)
-
-      // Without an app ID, new logs will render incorrectly in the listing 
page, so do not list or
-      // try to show their UI.
-      if (appListener.appId.isDefined) {
-        val attemptInfo = new FsApplicationAttemptInfo(
-          logPath.getName(),
-          appListener.appName.getOrElse(NOT_STARTED),
-          appListener.appId.getOrElse(logPath.getName()),
-          appListener.appAttemptId,
-          appListener.startTime.getOrElse(-1L),
-          appListener.endTime.getOrElse(-1L),
-          lastUpdated,
-          appListener.sparkUser.getOrElse(NOT_STARTED),
-          appCompleted,
-          fileStatus.getLen(),
-          appListener.appSparkVersion.getOrElse("")
-        )
-        fileToAppInfo.put(logPath, attemptInfo)
-        logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: 
$attemptInfo")
-        Some(attemptInfo)
-      } else {
-        logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
-          "The application may have not started.")
-        None
-      }
-
-    } catch {
-      case e: Exception =>
-        logError(
-          s"Exception encountered when attempting to load application log 
${fileStatus.getPath}",
-          e)
-        None
-    }
-
-    if (newAttempts.isEmpty) {
-      return
+    val eventsFilter: ReplayEventsFilter = { eventString =>
+      eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+        eventString.startsWith(APPL_END_EVENT_PREFIX) ||
+        eventString.startsWith(LOG_START_EVENT_PREFIX)
     }
 
-    // Build a map containing all apps that contain new attempts. The app 
information in this map
-    // contains both the new app attempt, and those that were already loaded 
in the existing apps
-    // map. If an attempt has been updated, it replaces the old attempt in the 
list.
-    val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
-
-    applications.synchronized {
-      newAttempts.foreach { attempt =>
-        val appInfo = newAppMap.get(attempt.appId)
-          .orElse(applications.get(attempt.appId))
-          .map { app =>
-            val attempts =
-              app.attempts.filter(_.attemptId != attempt.attemptId) ++ 
List(attempt)
-            new FsApplicationHistoryInfo(attempt.appId, attempt.name,
-              attempts.sortWith(compareAttemptInfo))
-          }
-          .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, 
List(attempt)))
-        newAppMap(attempt.appId) = appInfo
-      }
+    val logPath = fileStatus.getPath()
+    logInfo(s"Replaying log path: $logPath")
 
-      // Merge the new app list with the existing one, maintaining the 
expected ordering (descending
-      // end time). Maintaining the order is important to avoid having to sort 
the list every time
-      // there is a request for the log list.
-      val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
-      val mergedApps = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
-      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
-        if (!mergedApps.contains(info.id)) {
-          mergedApps += (info.id -> info)
-        }
-      }
+    val bus = new ReplayListenerBus()
+    val listener = new AppListingListener(fileStatus, clock)
+    bus.addListener(listener)
 
-      val newIterator = newApps.iterator.buffered
-      val oldIterator = applications.values.iterator.buffered
-      while (newIterator.hasNext && oldIterator.hasNext) {
-        if (newAppMap.contains(oldIterator.head.id)) {
-          oldIterator.next()
-        } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
-          addIfAbsent(newIterator.next())
-        } else {
-          addIfAbsent(oldIterator.next())
-        }
-      }
-      newIterator.foreach(addIfAbsent)
-      oldIterator.foreach(addIfAbsent)
-
-      applications = mergedApps
-    }
+    replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter)
+    listener.applicationInfo.foreach(addListing)
+    listing.write(LogInfo(logPath.toString(), fileStatus.getLen()))
   }
 
   /**
    * Delete event logs from the log directory according to the clean policy 
defined by the user.
    */
   private[history] def cleanLogs(): Unit = {
+    var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
     try {
-      val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", 
"7d") * 1000
-
-      val now = clock.getTimeMillis()
-      val appsToRetain = new mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]()
-
-      def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
-        now - attempt.lastUpdated > maxAge
-      }
+      val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
+
+      // Iterate descending over all applications whose oldest attempt 
happened before maxTime.
+      iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
+        .index("oldestAttempt")
+        .reverse()
+        .first(maxTime)
+        .closeableIterator())
+
+      iterator.get.asScala.foreach { app =>
+        // Applications may have multiple attempts, some of which may not need 
to be deleted yet.
+        val (remaining, toDelete) = app.attempts.partition { attempt =>
+          attempt.info.lastUpdated.getTime() >= maxTime
+        }
 
-      // Scan all logs from the log directory.
-      // Only completed applications older than the specified max age will be 
deleted.
-      applications.values.foreach { app =>
-        val (toClean, toRetain) = app.attempts.partition(shouldClean)
-        attemptsToClean ++= toClean
-
-        if (toClean.isEmpty) {
-          appsToRetain += (app.id -> app)
-        } else if (toRetain.nonEmpty) {
-          appsToRetain += (app.id ->
-            new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
+        if (remaining.nonEmpty) {
+          val newApp = new ApplicationInfoWrapper(app.info, remaining)
+          listing.write(newApp)
         }
-      }
 
-      applications = appsToRetain
+        toDelete.foreach { attempt =>
+          val logPath = new Path(logDir, attempt.logPath)
+          try {
+            listing.delete(classOf[LogInfo], logPath.toString())
+          } catch {
+            case _: NoSuchElementException =>
+              logDebug(s"Log info entry for $logPath not found.")
+          }
+          try {
+            fs.delete(logPath, true)
+          } catch {
+            case e: AccessControlException =>
+              logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
+            case t: IOException =>
+              logError(s"IOException in cleaning ${attempt.logPath}", t)
+          }
+        }
 
-      val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
-      attemptsToClean.foreach { attempt =>
-        try {
-          fs.delete(new Path(logDir, attempt.logPath), true)
-        } catch {
-          case e: AccessControlException =>
-            logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
-          case t: IOException =>
-            logError(s"IOException in cleaning ${attempt.logPath}", t)
-            leftToClean += attempt
+        if (remaining.isEmpty) {
+          listing.delete(app.getClass(), app.id)
         }
       }
-
-      attemptsToClean = leftToClean
     } catch {
-      case t: Exception => logError("Exception in cleaning logs", t)
+      case t: Exception => logError("Exception while cleaning logs", t)
+    } finally {
+      iterator.foreach(_.close())
     }
   }
 
   /**
-   * Comparison function that defines the sort order for the application 
listing.
-   *
-   * @return Whether `i1` should precede `i2`.
-   */
-  private def compareAppInfo(
-      i1: FsApplicationHistoryInfo,
-      i2: FsApplicationHistoryInfo): Boolean = {
-    val a1 = i1.attempts.head
-    val a2 = i2.attempts.head
-    if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime 
>= a2.startTime
-  }
-
-  /**
-   * Comparison function that defines the sort order for application attempts 
within the same
-   * application. Order is: attempts are sorted by descending start time.
-   * Most recent attempt state matches with current state of the app.
-   *
-   * Normally applications should have a single running attempt; but failure 
to call sc.stop()
-   * may cause multiple running attempts to show up.
-   *
-   * @return Whether `a1` should precede `a2`.
-   */
-  private def compareAttemptInfo(
-      a1: FsApplicationAttemptInfo,
-      a2: FsApplicationAttemptInfo): Boolean = {
-    a1.startTime >= a2.startTime
-  }
-
-  /**
    * Replays the events in the specified log file on the supplied 
`ReplayListenerBus`. Returns
    * an `ApplicationEventListener` instance with event data captured from the 
replay.
    * `ReplayEventsFilter` determines what events are replayed and can 
therefore limit the
@@ -649,6 +582,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       val appListener = new ApplicationEventListener
       bus.addListener(appListener)
       bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+      logInfo(s"Finished replaying $logPath")
       appListener
     } finally {
       logInput.close()
@@ -685,26 +619,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
    * @return a summary of the component state
    */
   override def toString: String = {
-    val header = s"""
-      | FsHistoryProvider: logdir=$logDir,
-      | last scan time=$lastScanTime
-      | Cached application count =${applications.size}}
-    """.stripMargin
-    val sb = new StringBuilder(header)
-    applications.foreach(entry => sb.append(entry._2).append("\n"))
-    sb.toString
-  }
-
-  /**
-   * Look up an application attempt
-   * @param appId application ID
-   * @param attemptId Attempt ID, if set
-   * @return the matching attempt, if found
-   */
-  def lookup(appId: String, attemptId: Option[String]): 
Option[FsApplicationAttemptInfo] = {
-    applications.get(appId).flatMap { appInfo =>
-      appInfo.attempts.find(_.attemptId == attemptId)
-    }
+    val count = listing.count(classOf[ApplicationInfoWrapper])
+    s"""|FsHistoryProvider{logdir=$logDir,
+        |  storedir=$storePath,
+        |  last scan time=$lastScanTime
+        |  application count=$count}""".stripMargin
   }
 
   /**
@@ -722,21 +641,69 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       appId: String,
       attemptId: Option[String],
       prevFileSize: Long)(): Boolean = {
-    lookup(appId, attemptId) match {
-      case None =>
+    try {
+      val attempt = getAttempt(appId, attemptId)
+      val logPath = fs.makeQualified(new Path(logDir, attempt.logPath))
+      recordedFileSize(logPath) > prevFileSize
+    } catch {
+      case _: NoSuchElementException =>
         logDebug(s"Application Attempt $appId/$attemptId not found")
         false
-      case Some(latest) =>
-        prevFileSize < latest.fileSize
     }
   }
-}
 
-private[history] object FsHistoryProvider {
-  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+  /**
+   * Return the last known size of the given event log, recorded the last time 
the file
+   * system scanner detected a change in the file.
+   */
+  private def recordedFileSize(log: Path): Long = {
+    try {
+      listing.read(classOf[LogInfo], log.toString()).fileSize
+    } catch {
+      case _: NoSuchElementException => 0L
+    }
+  }
+
+  private def load(appId: String): ApplicationInfoWrapper = {
+    listing.read(classOf[ApplicationInfoWrapper], appId)
+  }
+
+  /**
+   * Write the app's information to the given store. Serialized to avoid the 
(notedly rare) case
+   * where two threads are processing separate attempts of the same 
application.
+   */
+  private def addListing(app: ApplicationInfoWrapper): Unit = 
listing.synchronized {
+    val attempt = app.attempts.head
+
+    val oldApp = try {
+      load(app.id)
+    } catch {
+      case _: NoSuchElementException =>
+        app
+    }
+
+    def compareAttemptInfo(a1: AttemptInfoWrapper, a2: AttemptInfoWrapper): 
Boolean = {
+      a1.info.startTime.getTime() > a2.info.startTime.getTime()
+    }
+
+    val attempts = oldApp.attempts.filter(_.info.attemptId != 
attempt.info.attemptId) ++
+      List(attempt)
+
+    val newAppInfo = new ApplicationInfoWrapper(
+      app.info,
+      attempts.sortWith(compareAttemptInfo))
+    listing.write(newAppInfo)
+  }
 
-  private val NOT_STARTED = "<Not Started>"
+  /** For testing. Returns internal data about a single attempt. */
+  private[history] def getAttempt(appId: String, attemptId: Option[String]): 
AttemptInfoWrapper = {
+    load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse(
+      throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
+  }
 
+}
+
+private[history] object FsHistoryProvider {
   private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = 
"spark.history.fs.numReplayThreads"
 
   private val APPL_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationStart\""
@@ -744,53 +711,145 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\""
+
+  /**
+   * Current version of the data written to the listing database. When opening 
an existing
+   * db, if the version does not match this value, the FsHistoryProvider will 
throw away
+   * all data and re-generate the listing data from the event logs.
+   */
+  private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this entry 
was built by replaying
- *                    the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was scanned 
for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and uses 
the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+  version: Long,
+  logDir: String)
+
+private[history] case class LogInfo(
+  @KVIndexParam logPath: String,
+  fileSize: Long)
+
+private[history] class AttemptInfoWrapper(
+    val info: v1.ApplicationAttemptInfo,
     val logPath: String,
-    val name: String,
-    val appId: String,
-    attemptId: Option[String],
-    startTime: Long,
-    endTime: Long,
-    lastUpdated: Long,
-    sparkUser: String,
-    completed: Boolean,
-    val fileSize: Long,
-    appSparkVersion: String)
-  extends ApplicationAttemptInfo(
-      attemptId, startTime, endTime, lastUpdated, sparkUser, completed, 
appSparkVersion) {
-
-  /** extend the superclass string value with the extra attributes of this 
class */
-  override def toString: String = {
-    s"FsApplicationAttemptInfo($name, $appId," +
-      s" ${super.toString}, source=$logPath, size=$fileSize"
+    val fileSize: Long) {
+
+  def toAppAttemptInfo(): ApplicationAttemptInfo = {
+    ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
+      info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
+      info.completed, info.appSparkVersion)
   }
+
 }
 
-/**
- * Application history information
- * @param id application ID
- * @param name application name
- * @param attempts list of attempts, most recent first.
- */
-private class FsApplicationHistoryInfo(
-    id: String,
-    override val name: String,
-    override val attempts: List[FsApplicationAttemptInfo])
-  extends ApplicationHistoryInfo(id, name, attempts)
+private[history] class ApplicationInfoWrapper(
+    val info: v1.ApplicationInfo,
+    val attempts: List[AttemptInfoWrapper]) {
+
+  @JsonIgnore @KVIndexParam
+  def id: String = info.id
+
+  @JsonIgnore @KVIndexParam("endTime")
+  def endTime(): Long = attempts.head.info.endTime.getTime()
+
+  @JsonIgnore @KVIndexParam("oldestAttempt")
+  def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min
+
+  def toAppHistoryInfo(): ApplicationHistoryInfo = {
+    ApplicationHistoryInfo(info.id, info.name, 
attempts.map(_.toAppAttemptInfo()))
+  }
+
+}
+
+private[history] class AppListingListener(log: FileStatus, clock: Clock) 
extends SparkListener {
+
+  private val app = new MutableApplicationInfo()
+  private val attempt = new MutableAttemptInfo(log.getPath().getName(), 
log.getLen())
+
+  override def onApplicationStart(event: SparkListenerApplicationStart): Unit 
= {
+    app.id = event.appId.orNull
+    app.name = event.appName
+
+    attempt.attemptId = event.appAttemptId
+    attempt.startTime = new Date(event.time)
+    attempt.lastUpdated = new Date(clock.getTimeMillis())
+    attempt.sparkUser = event.sparkUser
+  }
+
+  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
+    attempt.endTime = new Date(event.time)
+    attempt.lastUpdated = new Date(log.getModificationTime())
+    attempt.duration = event.time - attempt.startTime.getTime()
+    attempt.completed = true
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case SparkListenerLogStart(sparkVersion) =>
+      attempt.appSparkVersion = sparkVersion
+    case _ =>
+  }
+
+  def applicationInfo: Option[ApplicationInfoWrapper] = {
+    if (app.id != null) {
+      Some(app.toView())
+    } else {
+      None
+    }
+  }
+
+  private class MutableApplicationInfo {
+    var id: String = null
+    var name: String = null
+    var coresGranted: Option[Int] = None
+    var maxCores: Option[Int] = None
+    var coresPerExecutor: Option[Int] = None
+    var memoryPerExecutorMB: Option[Int] = None
+
+    def toView(): ApplicationInfoWrapper = {
+      val apiInfo = new v1.ApplicationInfo(id, name, coresGranted, maxCores, 
coresPerExecutor,
+        memoryPerExecutorMB, Nil)
+      new ApplicationInfoWrapper(apiInfo, List(attempt.toView()))
+    }
+
+  }
+
+  private class MutableAttemptInfo(logPath: String, fileSize: Long) {
+    var attemptId: Option[String] = None
+    var startTime = new Date(-1)
+    var endTime = new Date(-1)
+    var lastUpdated = new Date(-1)
+    var duration = 0L
+    var sparkUser: String = null
+    var completed = false
+    var appSparkVersion = ""
+
+    def toView(): AttemptInfoWrapper = {
+      val apiInfo = new v1.ApplicationAttemptInfo(
+        attemptId,
+        startTime,
+        endTime,
+        lastUpdated,
+        duration,
+        sparkUser,
+        completed,
+        appSparkVersion)
+      new AttemptInfoWrapper(
+        apiInfo,
+        logPath,
+        fileSize)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/core/src/main/scala/org/apache/spark/deploy/history/config.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
new file mode 100644
index 0000000..fb9e997
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.meta.getter
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] object config {
+
+  /** Use this to annotate constructor params to be used as KVStore indices. */
+  type KVIndexParam = KVIndex @getter
+
+  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+    .stringConf
+    .createWithDefault(DEFAULT_LOG_DIR)
+
+  val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
+    .timeConf(TimeUnit.SECONDS)
+    .createWithDefaultString("7d")
+
+  val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
+    .doc("Local directory where to cache application history information. By 
default this is " +
+      "not set, meaning all history information will be kept in memory.")
+    .stringConf
+    .createOptional
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 05948f2..31659b2 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -20,6 +20,8 @@ import java.util.Date
 
 import scala.collection.Map
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+
 import org.apache.spark.JobExecutionStatus
 
 class ApplicationInfo private[spark](
@@ -31,6 +33,9 @@ class ApplicationInfo private[spark](
     val memoryPerExecutorMB: Option[Int],
     val attempts: Seq[ApplicationAttemptInfo])
 
+@JsonIgnoreProperties(
+  value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"),
+  allowGetters = true)
 class ApplicationAttemptInfo private[spark](
     val attemptId: Option[String],
     val startTime: Date,
@@ -40,9 +45,13 @@ class ApplicationAttemptInfo private[spark](
     val sparkUser: String,
     val completed: Boolean = false,
     val appSparkVersion: String) {
-    def getStartTimeEpoch: Long = startTime.getTime
-    def getEndTimeEpoch: Long = endTime.getTime
-    def getLastUpdatedEpoch: Long = lastUpdated.getTime
+
+  def getStartTimeEpoch: Long = startTime.getTime
+
+  def getEndTimeEpoch: Long = endTime.getTime
+
+  def getLastUpdatedEpoch: Long = lastUpdated.getTime
+
 }
 
 class ExecutorStageSummary private[spark](

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 7109146..2141934 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -36,6 +36,7 @@ import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
@@ -66,9 +67,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     new File(logPath)
   }
 
-  test("Parse application logs") {
+  Seq(true, false).foreach { inMemory =>
+    test(s"Parse application logs (inMemory = $inMemory)") {
+      testAppLogParsing(inMemory)
+    }
+  }
+
+  private def testAppLogParsing(inMemory: Boolean) {
     val clock = new ManualClock(12345678)
-    val provider = new FsHistoryProvider(createTestConf(), clock)
+    val provider = new FsHistoryProvider(createTestConf(inMemory = inMemory), 
clock)
 
     // Write a new-style application log.
     val newAppComplete = newLogFile("new1", None, inProgress = false)
@@ -172,20 +179,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     )
     updateAndCheck(provider) { list =>
       list.size should be (1)
-      list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath 
should
-        endWith(EventLoggingListener.IN_PROGRESS)
+      provider.getAttempt("app1", None).logPath should 
endWith(EventLoggingListener.IN_PROGRESS)
     }
 
     logFile1.renameTo(newLogFile("app1", None, inProgress = false))
     updateAndCheck(provider) { list =>
       list.size should be (1)
-      list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath 
should not
-        endWith(EventLoggingListener.IN_PROGRESS)
+      provider.getAttempt("app1", None).logPath should not 
endWith(EventLoggingListener.IN_PROGRESS)
     }
   }
 
   test("Parse logs that application is not started") {
-    val provider = new FsHistoryProvider((createTestConf()))
+    val provider = new FsHistoryProvider(createTestConf())
 
     val logFile1 = newLogFile("app1", None, inProgress = true)
     writeFile(logFile1, true, None,
@@ -342,17 +347,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     provider.checkForLogs()
 
     // This should not trigger any cleanup
-    updateAndCheck(provider)(list => list.size should be(2))
+    updateAndCheck(provider) { list =>
+      list.size should be(2)
+    }
 
     // Should trigger cleanup for first file but not second one
     clock.setTime(firstFileModifiedTime + maxAge + 1)
-    updateAndCheck(provider)(list => list.size should be(1))
+    updateAndCheck(provider) { list =>
+      list.size should be(1)
+    }
     assert(!log1.exists())
     assert(log2.exists())
 
     // Should cleanup the second file as well.
     clock.setTime(secondFileModifiedTime + maxAge + 1)
-    updateAndCheck(provider)(list => list.size should be(0))
+    updateAndCheck(provider) { list =>
+      list.size should be(0)
+    }
     assert(!log1.exists())
     assert(!log2.exists())
   }
@@ -580,7 +591,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
       securityManager.checkUIViewPermissions("user4") should be (false)
       securityManager.checkUIViewPermissions("user5") should be (false)
     }
- }
+  }
+
+  test("mismatched version discards old listing") {
+    val conf = createTestConf()
+    val oldProvider = new FsHistoryProvider(conf)
+
+    val logFile1 = newLogFile("app1", None, inProgress = false)
+    writeFile(logFile1, true, None,
+      SparkListenerLogStart("2.3"),
+      SparkListenerApplicationStart("test", Some("test"), 1L, "test", None),
+      SparkListenerApplicationEnd(5L)
+    )
+
+    updateAndCheck(oldProvider) { list =>
+      list.size should be (1)
+    }
+    assert(oldProvider.listing.count(classOf[ApplicationInfoWrapper]) === 1)
+
+    // Manually overwrite the version in the listing db; this should cause the 
new provider to
+    // discard all data because the versions don't match.
+    val meta = new KVStoreMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 
1,
+      conf.get(LOCAL_STORE_DIR).get)
+    oldProvider.listing.setMetadata(meta)
+    oldProvider.stop()
+
+    val mistatchedVersionProvider = new FsHistoryProvider(conf)
+    
assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) 
=== 0)
+  }
 
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
@@ -623,8 +661,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
BeforeAndAfter with Matc
     new FileOutputStream(file).close()
   }
 
-  private def createTestConf(): SparkConf = {
-    new SparkConf().set("spark.history.fs.logDirectory", 
testDir.getAbsolutePath())
+  private def createTestConf(inMemory: Boolean = false): SparkConf = {
+    val conf = new SparkConf()
+      .set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
+
+    if (!inMemory) {
+      conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
+    }
+
+    conf
   }
 
   private class SafeModeTestProvider(conf: SparkConf, clock: Clock)

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 18da8c1..c11543a 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -43,6 +43,7 @@ import org.scalatest.mockito.MockitoSugar
 import org.scalatest.selenium.WebBrowser
 
 import org.apache.spark._
+import org.apache.spark.deploy.history.config._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.ui.jobs.UIData.JobUIData
 import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -64,6 +65,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 
   private val logDir = getTestResourcePath("spark-events")
   private val expRoot = getTestResourceFile("HistoryServerExpectations")
+  private val storeDir = Utils.createTempDir(namePrefix = "history")
 
   private var provider: FsHistoryProvider = null
   private var server: HistoryServer = null
@@ -74,6 +76,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
       .set("spark.history.fs.logDirectory", logDir)
       .set("spark.history.fs.update.interval", "0")
       .set("spark.testing", "true")
+      .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
     conf.setAll(extraConf)
     provider = new FsHistoryProvider(conf)
     provider.checkForLogs()
@@ -87,14 +90,13 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
 
   def stop(): Unit = {
     server.stop()
+    server = null
   }
 
   before {
-    init()
-  }
-
-  after{
-    stop()
+    if (server == null) {
+      init()
+    }
   }
 
   val cases = Seq(
@@ -296,6 +298,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
       .set("spark.history.fs.logDirectory", logDir)
       .set("spark.history.fs.update.interval", "0")
       .set("spark.testing", "true")
+      .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
 
     provider = new FsHistoryProvider(conf)
     provider.checkForLogs()
@@ -372,6 +375,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
   }
 
   test("incomplete apps get refreshed") {
+    server.stop()
 
     implicit val webDriver: WebDriver = new HtmlUnitDriver
     implicit val formats = org.json4s.DefaultFormats
@@ -388,6 +392,7 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
       .set("spark.history.fs.update.interval", "1s")
       .set("spark.eventLog.enabled", "true")
       .set("spark.history.cache.window", "250ms")
+      .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
       .remove("spark.testing")
     val provider = new FsHistoryProvider(myConf)
     val securityManager = HistoryServer.createSecurityManager(myConf)
@@ -413,8 +418,6 @@ class HistoryServerSuite extends SparkFunSuite with 
BeforeAndAfter with Matchers
       }
     }
 
-    // stop the server with the old config, and start the new one
-    server.stop()
     server = new HistoryServer(myConf, provider, securityManager, 18080)
     server.initialize()
     server.bind()

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 51084a2..1ae4318 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -224,6 +224,15 @@ The history server can be configured as follows:
       Number of threads that will be used by history server to process event 
logs.
     </td>
   </tr>
+  <tr>
+    <td>spark.history.store.path</td>
+    <td>(none)</td>
+    <td>
+        Local directory where to cache application history data. If set, the 
history
+        server will store application data on disk instead of keeping it in 
memory. The data
+        written to disk will be re-used in the event of a history server 
restart.
+    </td>
+  </tr>
 </table>
 
 Note that in all of these UIs, the tables are sortable by clicking their 
headers,

http://git-wip-us.apache.org/repos/asf/spark/blob/74daf622/scalastyle-config.xml
----------------------------------------------------------------------
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index bd7f462..7bdd3fa 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -86,7 +86,7 @@ This file is divided into 3 sections:
   </check>
 
   <check level="error" class="org.scalastyle.scalariform.ObjectNamesChecker" 
enabled="true">
-    <parameters><parameter 
name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter></parameters>
+    <parameters><parameter 
name="regex"><![CDATA[(config|[A-Z][A-Za-z]*)]]></parameter></parameters>
   </check>
 
   <check level="error" 
class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to