Repository: spark
Updated Branches:
  refs/heads/master 1a49496b4 -> 6776cb33e


[SPARK-6066] Make event log format easier to parse

Some users have reported difficulty in parsing the new event log format. Since 
we embed the metadata in the beginning of the file, when we compress the event 
log we need to skip the metadata because we need that information to parse the 
log later. This means we'll end up with a partially compressed file if event 
logging compression is turned on. The old format looks like:
```
sparkVersion = 1.3.0
compressionCodec = org.apache.spark.io.LZFCompressionCodec
=== LOG_HEADER_END ===
// actual events, could be compressed bytes
```
The new format in this patch puts the compression codec in the log file name 
instead. It also removes the metadata header altogether along with the Spark 
version, which was not needed. The new file name looks something like:
```
app_without_compression
app_123.lzf
app_456.snappy
```

I tested this with and without compression, using different compression codecs 
and event logging directories. I verified that both the `Master` and the 
`HistoryServer` can render both compressed and uncompressed logs as before.

Author: Andrew Or <and...@databricks.com>

Closes #4821 from andrewor14/event-log-format and squashes the following 
commits:

8511141 [Andrew Or] Fix test
654883d [Andrew Or] Add back metadata with Spark version
7f537cd [Andrew Or] Address review feedback
7d6aa61 [Andrew Or] Make codec an extension
59abee9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
event-log-format
27c9a6c [Andrew Or] Address review feedback
519e51a [Andrew Or] Address review feedback
ef69276 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
event-log-format
88a091d [Andrew Or] Add tests for new format and file name
f32d8d2 [Andrew Or] Fix tests
8db5a06 [Andrew Or] Embed metadata in the event log file name instead


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6776cb33
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6776cb33
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6776cb33

Branch: refs/heads/master
Commit: 6776cb33ea691f7843b956b3e80979282967e826
Parents: 1a49496
Author: Andrew Or <and...@databricks.com>
Authored: Mon Mar 2 16:34:32 2015 -0800
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Mon Mar 2 16:34:32 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   9 ++
 .../spark/deploy/ApplicationDescription.scala   |  10 +-
 .../deploy/history/FsHistoryProvider.scala      |  22 +--
 .../org/apache/spark/deploy/master/Master.scala |   8 +-
 .../org/apache/spark/io/CompressionCodec.scala  |  21 ++-
 .../spark/scheduler/EventLoggingListener.scala  | 162 +++++++------------
 .../spark/scheduler/ReplayListenerBus.scala     |   3 +-
 .../apache/spark/scheduler/SparkListener.scala  |   5 +
 .../spark/scheduler/SparkListenerBus.scala      |   1 +
 .../cluster/SparkDeploySchedulerBackend.scala   |   2 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  14 ++
 .../deploy/history/FsHistoryProviderSuite.scala |  69 +++++---
 .../scheduler/EventLoggingListenerSuite.scala   |  62 ++++---
 .../spark/scheduler/ReplayListenerSuite.scala   |  13 +-
 14 files changed, 212 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cd0c21..e231e83 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, 
SparkHadoopUtil}
 import org.apache.spark.executor.TriggerThreadDump
 import org.apache.spark.input.{StreamInputFormat, PortableDataStream, 
WholeTextFileInputFormat,
   FixedLengthBinaryInputFormat}
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       None
     }
   }
+  private[spark] val eventLogCodec: Option[String] = {
+    val compress = conf.getBoolean("spark.eventLog.compress", false)
+    if (compress && isEventLogEnabled) {
+      
Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
+    } else {
+      None
+    }
+  }
 
   // Generate the random name for a temp folder in Tachyon
   // Add a timestamp as the suffix here to make it more safe

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala 
b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index ae55b4f..3d0d68d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
     val memoryPerSlave: Int,
     val command: Command,
     var appUiUrl: String,
-    val eventLogDir: Option[String] = None)
+    val eventLogDir: Option[String] = None,
+    // short name of compression codec used when writing event logs, if any 
(e.g. lzf)
+    val eventLogCodec: Option[String] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
       memoryPerSlave: Int = memoryPerSlave,
       command: Command = command,
       appUiUrl: String = appUiUrl,
-      eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
-    new ApplicationDescription(name, maxCores, memoryPerSlave, command, 
appUiUrl, eventLogDir)
+      eventLogDir: Option[String] = eventLogDir,
+      eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
+    new ApplicationDescription(
+      name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, 
eventLogCodec)
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c5fab1d..16d88c1 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -83,8 +83,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
 
   // Constants used to parse Spark 1.0.0 log directories.
   private[history] val LOG_PREFIX = "EVENT_LOG_"
-  private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
-  private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+  private[history] val SPARK_VERSION_PREFIX = 
EventLoggingListener.SPARK_VERSION_KEY + "_"
+  private[history] val COMPRESSION_CODEC_PREFIX = 
EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
   private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
 
   /**
@@ -324,7 +324,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
   private def replay(eventLog: FileStatus, bus: ReplayListenerBus): 
FsApplicationHistoryInfo = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
-    val (logInput, sparkVersion) =
+    val logInput =
       if (isLegacyLogDirectory(eventLog)) {
         openLegacyEventLog(logPath)
       } else {
@@ -333,7 +333,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
     try {
       val appListener = new ApplicationEventListener
       bus.addListener(appListener)
-      bus.replay(logInput, sparkVersion, logPath.toString)
+      bus.replay(logInput, logPath.toString)
       new FsApplicationHistoryInfo(
         logPath.getName(),
         appListener.appId.getOrElse(logPath.getName()),
@@ -353,30 +353,24 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
    * log file (along with other metadata files), which is the case for 
directories generated by
    * the code in previous releases.
    *
-   * @return 2-tuple of (input stream of the events, version of Spark which 
wrote the log)
+   * @return input stream that holds one JSON record per line.
    */
-  private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
+  private[history] def openLegacyEventLog(dir: Path): InputStream = {
     val children = fs.listStatus(dir)
     var eventLogPath: Path = null
     var codecName: Option[String] = None
-    var sparkVersion: String = null
 
     children.foreach { child =>
       child.getPath().getName() match {
         case name if name.startsWith(LOG_PREFIX) =>
           eventLogPath = child.getPath()
-
         case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
           codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))
-
-        case version if version.startsWith(SPARK_VERSION_PREFIX) =>
-          sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())
-
         case _ =>
       }
     }
 
-    if (eventLogPath == null || sparkVersion == null) {
+    if (eventLogPath == null) {
       throw new IllegalArgumentException(s"$dir is not a Spark application log 
directory.")
     }
 
@@ -388,7 +382,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) 
extends ApplicationHis
       }
 
     val in = new BufferedInputStream(fs.open(eventLogPath))
-    (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+    codec.map(_.compressedInputStream(in)).getOrElse(in)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 8cc6ec1..148485c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -737,13 +737,13 @@ private[spark] class Master(
     val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
     try {
       val eventLogFile = app.desc.eventLogDir
-        .map { dir => EventLoggingListener.getLogPath(dir, app.id) }
+        .map { dir => EventLoggingListener.getLogPath(dir, app.id, 
app.desc.eventLogCodec) }
         .getOrElse {
           // Event logging is not enabled for this application
           app.desc.appUiUrl = notFoundBasePath
           return false
         }
-        
+
       val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
 
       if (fs.exists(new Path(eventLogFile + 
EventLoggingListener.IN_PROGRESS))) {
@@ -756,12 +756,12 @@ private[spark] class Master(
         return false
       }
 
-      val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new 
Path(eventLogFile), fs)
+      val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), 
fs)
       val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new 
SecurityManager(conf),
         appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
       try {
-        replayBus.replay(logInput, sparkVersion, eventLogFile)
+        replayBus.replay(logInput, eventLogFile)
       } finally {
         logInput.close()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index f856890..0709b6d 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, 
SnappyOutputStream}
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
-import org.apache.spark.Logging
 
 /**
  * :: DeveloperApi ::
@@ -53,8 +52,12 @@ private[spark] object CompressionCodec {
     "lzf" -> classOf[LZFCompressionCodec].getName,
     "snappy" -> classOf[SnappyCompressionCodec].getName)
 
+  def getCodecName(conf: SparkConf): String = {
+    conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
+  }
+
   def createCodec(conf: SparkConf): CompressionCodec = {
-    createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
+    createCodec(conf, getCodecName(conf))
   }
 
   def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
@@ -71,6 +74,20 @@ private[spark] object CompressionCodec {
       s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
   }
 
+  /**
+   * Return the short version of the given codec name.
+   * If it is already a short name, just return it.
+   */
+  def getShortName(codecName: String): String = {
+    if (shortCompressionCodecNames.contains(codecName)) {
+      codecName
+    } else {
+      shortCompressionCodecNames
+        .collectFirst { case (k, v) if v == codecName => k }
+        .getOrElse { throw new IllegalArgumentException(s"No short name for 
codec $codecName.") }
+    }
+  }
+
   val FALLBACK_COMPRESSION_CODEC = "lzf"
   val DEFAULT_COMPRESSION_CODEC = "snappy"
   val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 30075c1..2091a9f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -62,6 +62,15 @@ private[spark] class EventLoggingListener(
   private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
   private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 
100) * 1024
   private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), 
hadoopConf)
+  private val compressionCodec =
+    if (shouldCompress) {
+      Some(CompressionCodec.createCodec(sparkConf))
+    } else {
+      None
+    }
+  private val compressionCodecName = compressionCodec.map { c =>
+    CompressionCodec.getShortName(c.getClass.getName)
+  }
 
   // Only defined if the file system scheme is not local
   private var hadoopDataStream: Option[FSDataOutputStream] = None
@@ -80,7 +89,7 @@ private[spark] class EventLoggingListener(
   private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
   // Visible for tests only.
-  private[scheduler] val logPath = getLogPath(logBaseDir, appId)
+  private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
compressionCodecName)
 
   /**
    * Creates the log file in the configured log directory.
@@ -111,19 +120,19 @@ private[spark] class EventLoggingListener(
         hadoopDataStream.get
       }
 
-    val compressionCodec =
-      if (shouldCompress) {
-        Some(CompressionCodec.createCodec(sparkConf))
-      } else {
-        None
-      }
-
-    fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
-    val logStream = initEventLog(new BufferedOutputStream(dstream, 
outputBufferSize),
-      compressionCodec)
-    writer = Some(new PrintWriter(logStream))
+    try {
+      val cstream = 
compressionCodec.map(_.compressedOutputStream(dstream)).getOrElse(dstream)
+      val bstream = new BufferedOutputStream(cstream, outputBufferSize)
 
-    logInfo("Logging events to %s".format(logPath))
+      EventLoggingListener.initEventLog(bstream)
+      fileSystem.setPermission(path, LOG_FILE_PERMISSIONS)
+      writer = Some(new PrintWriter(bstream))
+      logInfo("Logging events to %s".format(logPath))
+    } catch {
+      case e: Exception =>
+        dstream.close()
+        throw e
+    }
   }
 
   /** Log the event as JSON. */
@@ -201,77 +210,57 @@ private[spark] object EventLoggingListener extends 
Logging {
   // Suffix applied to the names of files still being written by applications.
   val IN_PROGRESS = ".inprogress"
   val DEFAULT_LOG_DIR = "/tmp/spark-events"
+  val SPARK_VERSION_KEY = "SPARK_VERSION"
+  val COMPRESSION_CODEC_KEY = "COMPRESSION_CODEC"
 
   private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 
8).toShort)
 
-  // Marker for the end of header data in a log file. After this marker, log 
data, potentially
-  // compressed, will be found.
-  private val HEADER_END_MARKER = "=== LOG_HEADER_END ==="
-
-  // To avoid corrupted files causing the heap to fill up. Value is arbitrary.
-  private val MAX_HEADER_LINE_LENGTH = 4096
-
   // A cache for compression codecs to avoid creating the same codec many times
   private val codecMap = new mutable.HashMap[String, CompressionCodec]
 
   /**
-   * Write metadata about the event log to the given stream.
-   *
-   * The header is a serialized version of a map, except it does not use Java 
serialization to
-   * avoid incompatibilities between different JDKs. It writes one map entry 
per line, in
-   * "key=value" format.
+   * Write metadata about an event log to the given stream.
+   * The metadata is encoded in the first line of the event log as JSON.
    *
-   * The very last entry in the header is the `HEADER_END_MARKER` marker, so 
that the parsing code
-   * can know when to stop.
-   *
-   * The format needs to be kept in sync with the openEventLog() method below. 
Also, it cannot
-   * change in new Spark versions without some other way of detecting the 
change (like some
-   * metadata encoded in the file name).
-   *
-   * @param logStream Raw output stream to the even log file.
-   * @param compressionCodec Optional compression codec to use.
-   * @return A stream where to write event log data. This may be a wrapper 
around the original
-   *         stream (for example, when compression is enabled).
+   * @param logStream Raw output stream to the event log file.
    */
-  def initEventLog(
-      logStream: OutputStream,
-      compressionCodec: Option[CompressionCodec]): OutputStream = {
-    val meta = mutable.HashMap(("version" -> SPARK_VERSION))
-    compressionCodec.foreach { codec =>
-      meta += ("compressionCodec" -> codec.getClass().getName())
-    }
-
-    def write(entry: String) = {
-      val bytes = entry.getBytes(Charsets.UTF_8)
-      if (bytes.length > MAX_HEADER_LINE_LENGTH) {
-        throw new IOException(s"Header entry too long: ${entry}")
-      }
-      logStream.write(bytes, 0, bytes.length)
-    }
-
-    meta.foreach { case (k, v) => write(s"$k=$v\n") }
-    write(s"$HEADER_END_MARKER\n")
-    
compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
+  def initEventLog(logStream: OutputStream): Unit = {
+    val metadata = SparkListenerLogStart(SPARK_VERSION)
+    val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n"
+    logStream.write(metadataJson.getBytes(Charsets.UTF_8))
   }
 
   /**
    * Return a file-system-safe path to the log file for the given application.
    *
+   * Note that because we currently only create a single log file for each 
application,
+   * we must encode all the information needed to parse this event log in the 
file name
+   * instead of within the file itself. Otherwise, if the file is compressed, 
for instance,
+   * we won't know which codec to use to decompress the metadata needed to 
open the file in
+   * the first place.
+   *
    * @param logBaseDir Directory where the log file will be written.
    * @param appId A unique app ID.
+   * @param compressionCodecName Name to identify the codec used to compress 
the contents
+   *                             of the log, or None if compression is not 
enabled.
    * @return A path which consists of file-system-safe characters.
    */
-  def getLogPath(logBaseDir: String, appId: String): String = {
-    val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_").toLowerCase
-    Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+  def getLogPath(
+      logBaseDir: String,
+      appId: String,
+      compressionCodecName: Option[String] = None): String = {
+    val sanitizedAppId = appId.replaceAll("[ :/]", 
"-").replaceAll("[.${}'\"]", "_").toLowerCase
+    // e.g. app_123, app_123.lzf
+    val logName = sanitizedAppId + compressionCodecName.map { "." + _ 
}.getOrElse("")
+    Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
   }
 
   /**
-   * Opens an event log file and returns an input stream to the event data.
+   * Opens an event log file and returns an input stream that contains the 
event data.
    *
-   * @return 2-tuple (event input stream, Spark version of event data)
+   * @return input stream that holds one JSON record per line.
    */
-  def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
+  def openEventLog(log: Path, fs: FileSystem): InputStream = {
     // It's not clear whether FileSystem.open() throws FileNotFoundException 
or just plain
     // IOException when a file does not exist, so try our best to throw a 
proper exception.
     if (!fs.exists(log)) {
@@ -279,52 +268,17 @@ private[spark] object EventLoggingListener extends 
Logging {
     }
 
     val in = new BufferedInputStream(fs.open(log))
-    // Read a single line from the input stream without buffering.
-    // We cannot use BufferedReader because we must avoid reading
-    // beyond the end of the header, after which the content of the
-    // file may be compressed.
-    def readLine(): String = {
-      val bytes = new ByteArrayOutputStream()
-      var next = in.read()
-      var count = 0
-      while (next != '\n') {
-        if (next == -1) {
-          throw new IOException("Unexpected end of file.")
-        }
-        bytes.write(next)
-        count = count + 1
-        if (count > MAX_HEADER_LINE_LENGTH) {
-          throw new IOException("Maximum header line length exceeded.")
-        }
-        next = in.read()
-      }
-      new String(bytes.toByteArray(), Charsets.UTF_8)
+
+    // Compression codec is encoded as an extension, e.g. app_123.lzf
+    // Since we sanitize the app ID to not include periods, it is safe to 
split on it
+    val logName = log.getName.stripSuffix(IN_PROGRESS)
+    val codecName: Option[String] = logName.split("\\.").tail.lastOption
+    val codec = codecName.map { c =>
+      codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, 
c))
     }
 
-    // Parse the header metadata in the form of k=v pairs
-    // This assumes that every line before the header end marker follows this 
format
     try {
-      val meta = new mutable.HashMap[String, String]()
-      var foundEndMarker = false
-      while (!foundEndMarker) {
-        readLine() match {
-          case HEADER_END_MARKER =>
-            foundEndMarker = true
-          case entry =>
-            val prop = entry.split("=", 2)
-            if (prop.length != 2) {
-              throw new IllegalArgumentException("Invalid metadata in log 
file.")
-            }
-            meta += (prop(0) -> prop(1))
-        }
-      }
-
-      val sparkVersion = meta.get("version").getOrElse(
-        throw new IllegalArgumentException("Missing Spark version in log 
metadata."))
-      val codec = meta.get("compressionCodec").map { codecName =>
-        codecMap.getOrElseUpdate(codecName, CompressionCodec.createCodec(new 
SparkConf, codecName))
-      }
-      (codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
+      codec.map(_.compressedInputStream(in)).getOrElse(in)
     } catch {
       case e: Exception =>
         in.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index d9c3a10..95273c7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -39,10 +39,9 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
    * error is thrown by this method.
    *
    * @param logData Stream containing event log data.
-   * @param version Spark version that generated the events.
    * @param sourceName Filename (or other source identifier) from whence 
@logData is being read
    */
-  def replay(logData: InputStream, version: String, sourceName: String) {
+  def replay(logData: InputStream, sourceName: String): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 1
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index dd28ddb..52720d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,6 +116,11 @@ case class SparkListenerApplicationStart(appName: String, 
appId: Option[String],
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
 
+/**
+ * An internal class that describes the metadata of an event log.
+ * This event is not meant to be posted to listeners downstream.
+ */
+private[spark] case class SparkListenerLogStart(sparkVersion: String) extends 
SparkListenerEvent
 
 /**
  * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index fe8a19a..61e69ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -58,6 +58,7 @@ private[spark] trait SparkListenerBus extends 
ListenerBus[SparkListener, SparkLi
         listener.onExecutorAdded(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>
         listener.onExecutorRemoved(executorRemoved)
+      case logStart: SparkListenerLogStart => // ignore event log metadata
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a0aa555..ffd4825 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -85,7 +85,7 @@ private[spark] class SparkDeploySchedulerBackend(
       args, sc.executorEnvs, classPathEntries ++ testingClassPath, 
libraryPathEntries, javaOpts)
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
     val appDesc = new ApplicationDescription(sc.appName, maxCores, 
sc.executorMemory, command,
-      appUIAddress, sc.eventLogDir)
+      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e20864..474f79f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -89,6 +89,8 @@ private[spark] object JsonProtocol {
         executorAddedToJson(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>
         executorRemovedToJson(executorRemoved)
+      case logStart: SparkListenerLogStart =>
+        logStartToJson(logStart)
       // These aren't used, but keeps compiler happy
       case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
     }
@@ -214,6 +216,11 @@ private[spark] object JsonProtocol {
     ("Removed Reason" -> executorRemoved.reason)
   }
 
+  def logStartToJson(logStart: SparkListenerLogStart): JValue = {
+    ("Event" -> Utils.getFormattedClassName(logStart)) ~
+    ("Spark Version" -> SPARK_VERSION)
+  }
+
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
    * -------------------------------------------------------------------- */
@@ -447,6 +454,7 @@ private[spark] object JsonProtocol {
     val applicationEnd = 
Utils.getFormattedClassName(SparkListenerApplicationEnd)
     val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
     val executorRemoved = 
Utils.getFormattedClassName(SparkListenerExecutorRemoved)
+    val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -464,6 +472,7 @@ private[spark] object JsonProtocol {
       case `applicationEnd` => applicationEndFromJson(json)
       case `executorAdded` => executorAddedFromJson(json)
       case `executorRemoved` => executorRemovedFromJson(json)
+      case `logStart` => logStartFromJson(json)
     }
   }
 
@@ -574,6 +583,11 @@ private[spark] object JsonProtocol {
     SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
+  def logStartFromJson(json: JValue): SparkListenerLogStart = {
+    val sparkVersion = (json \ "Spark Version").extract[String]
+    SparkListenerLogStart(sparkVersion)
+  }
+
   /** --------------------------------------------------------------------- *
    * JSON deserialization methods for classes SparkListenerEvents depend on |
    * ---------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 85939ea..e908ba6 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.io.{BufferedOutputStream, File, FileOutputStream, 
OutputStreamWriter}
+import java.net.URI
 
 import scala.io.Source
 
-import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.Matchers
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
 import org.apache.spark.util.{JsonProtocol, Utils}
@@ -45,18 +44,35 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
     Utils.deleteRecursively(testDir)
   }
 
+  /** Create a fake log file using the new log format used in Spark 1.3+ */
+  private def newLogFile(
+      appId: String,
+      inProgress: Boolean,
+      codec: Option[String] = None): File = {
+    val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
+    val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, 
appId)
+    val logPath = new URI(logUri).getPath + ip
+    new File(logPath)
+  }
+
   test("Parse new and old application logs") {
     val provider = new FsHistoryProvider(createTestConf())
 
     // Write a new-style application log.
-    val newAppComplete = new File(testDir, "new1")
+    val newAppComplete = newLogFile("new1", inProgress = false)
     writeFile(newAppComplete, true, None,
       SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
-      SparkListenerApplicationEnd(4L)
+      SparkListenerApplicationEnd(5L)
       )
 
+    // Write a new-style application log.
+    val newAppCompressedComplete = newLogFile("new1compressed", inProgress = 
false, Some("lzf"))
+    writeFile(newAppCompressedComplete, true, None,
+      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, 
"test"),
+      SparkListenerApplicationEnd(4L))
+
     // Write an unfinished app, new-style.
-    val newAppIncomplete = new File(testDir, "new2" + 
EventLoggingListener.IN_PROGRESS)
+    val newAppIncomplete = newLogFile("new2", inProgress = true)
     writeFile(newAppIncomplete, true, None,
       SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
       )
@@ -89,16 +105,18 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
 
     val list = provider.getListing().toSeq
     list should not be (null)
-    list.size should be (4)
-    list.count(e => e.completed) should be (2)
+    list.size should be (5)
+    list.count(_.completed) should be (3)
 
-    list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), 
"new-app-complete", 1L, 4L,
+    list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), 
"new-app-complete", 1L, 5L,
       newAppComplete.lastModified(), "test", true))
-    list(1) should be (ApplicationHistoryInfo(oldAppComplete.getName(), 
"old-app-complete", 2L, 3L,
+    list(1) should be 
(ApplicationHistoryInfo(newAppCompressedComplete.getName(),
+      "new-app-compressed-complete", 1L, 4L, 
newAppCompressedComplete.lastModified(), "test", true))
+    list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), 
"old-app-complete", 2L, 3L,
       oldAppComplete.lastModified(), "test", true))
-    list(2) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), 
"old-app-incomplete", 2L,
+    list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), 
"old-app-incomplete", 2L,
       -1L, oldAppIncomplete.lastModified(), "test", false))
-    list(3) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), 
"new-app-incomplete", 1L,
+    list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), 
"new-app-incomplete", 1L,
       -1L, newAppIncomplete.lastModified(), "test", false))
 
     // Make sure the UI can be rendered.
@@ -127,7 +145,7 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
 
       val logPath = new Path(logDir.getAbsolutePath())
       try {
-        val (logInput, sparkVersion) = provider.openLegacyEventLog(logPath)
+        val logInput = provider.openLegacyEventLog(logPath)
         try {
           Source.fromInputStream(logInput).getLines().toSeq.size should be (2)
         } finally {
@@ -141,12 +159,12 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
   }
 
   test("SPARK-3697: ignore directories that cannot be read.") {
-    val logFile1 = new File(testDir, "new1")
+    val logFile1 = newLogFile("new1", inProgress = false)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1-1", None, 1L, "test"),
       SparkListenerApplicationEnd(2L)
       )
-    val logFile2 = new File(testDir, "new2")
+    val logFile2 = newLogFile("new2", inProgress = false)
     writeFile(logFile2, true, None,
       SparkListenerApplicationStart("app1-2", None, 1L, "test"),
       SparkListenerApplicationEnd(2L)
@@ -164,7 +182,7 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
   test("history file is renamed from inprogress to completed") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+    val logFile1 = newLogFile("app1", inProgress = true)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
       SparkListenerApplicationEnd(2L)
@@ -174,7 +192,7 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
     appListBeforeRename.size should be (1)
     appListBeforeRename.head.logPath should 
endWith(EventLoggingListener.IN_PROGRESS)
 
-    logFile1.renameTo(new File(testDir, "app1"))
+    logFile1.renameTo(newLogFile("app1", inProgress = false))
     provider.checkForLogs()
     val appListAfterRename = provider.getListing()
     appListAfterRename.size should be (1)
@@ -184,7 +202,7 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
   test("SPARK-5582: empty log directory") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val logFile1 = new File(testDir, "app1" + EventLoggingListener.IN_PROGRESS)
+    val logFile1 = newLogFile("app1", inProgress = true)
     writeFile(logFile1, true, None,
       SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
       SparkListenerApplicationEnd(2L))
@@ -199,14 +217,13 @@ class FsHistoryProviderSuite extends FunSuite with 
BeforeAndAfter with Matchers
 
   private def writeFile(file: File, isNewFormat: Boolean, codec: 
Option[CompressionCodec],
     events: SparkListenerEvent*) = {
-    val out =
-      if (isNewFormat) {
-        EventLoggingListener.initEventLog(new FileOutputStream(file), codec)
-      } else {
-        val fileStream = new FileOutputStream(file)
-        codec.map(_.compressedOutputStream(fileStream)).getOrElse(fileStream)
-      }
-    val writer = new OutputStreamWriter(out, "UTF-8")
+    val fstream = new FileOutputStream(file)
+    val cstream = 
codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
+    val bstream = new BufferedOutputStream(cstream)
+    if (isNewFormat) {
+      EventLoggingListener.initEventLog(new FileOutputStream(file))
+    }
+    val writer = new OutputStreamWriter(bstream, "UTF-8")
     try {
       events.foreach(e => 
writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 437d869..992dde6 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.io.{File, FileOutputStream, InputStream, IOException}
+import java.net.URI
 
 import scala.collection.mutable
 import scala.io.Source
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import org.apache.spark.{Logging, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io._
 import org.apache.spark.util.{JsonProtocol, Utils}
@@ -78,7 +79,7 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
 
   test("Basic event logging with compression") {
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
-      testEventLogging(compressionCodec = Some(codec))
+      testEventLogging(compressionCodec = 
Some(CompressionCodec.getShortName(codec)))
     }
   }
 
@@ -88,25 +89,35 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
 
   test("End-to-end event logging with compression") {
     CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
-      testApplicationEventLogging(compressionCodec = Some(codec))
+      testApplicationEventLogging(compressionCodec = 
Some(CompressionCodec.getShortName(codec)))
     }
   }
 
   test("Log overwriting") {
-    val log = new FileOutputStream(new File(testDir, "test"))
-    log.close()
-    try {
-      testEventLogging()
-      assert(false)
-    } catch {
-      case e: IOException =>
-        // Expected, since we haven't enabled log overwrite.
-    }
-
+    val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, 
"test")
+    val logPath = new URI(logUri).getPath
+    // Create file before writing the event log
+    new FileOutputStream(new File(logPath)).close()
+    // Expected IOException, since we haven't enabled log overwrite.
+    intercept[IOException] { testEventLogging() }
     // Try again, but enable overwriting.
     testEventLogging(extraConf = Map("spark.eventLog.overwrite" -> "true"))
   }
 
+  test("Event log name") {
+    // without compression
+    assert(s"file:/base-dir/app1" === 
EventLoggingListener.getLogPath("/base-dir", "app1"))
+    // with compression
+    assert(s"file:/base-dir/app1.lzf" ===
+      EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
+    // illegal characters in app ID
+    assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
+      EventLoggingListener.getLogPath("/base-dir", "a 
fine:mind$dollar{bills}.1"))
+    // illegal characters in app ID with compression
+    assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
+      EventLoggingListener.getLogPath("/base-dir", "a 
fine:mind$dollar{bills}.1", Some("lz4")))
+  }
+
   /* ----------------- *
    * Actual test logic *
    * ----------------- */
@@ -140,15 +151,17 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
     eventLogger.stop()
 
     // Verify file contains exactly the two events logged
-    val (logData, version) = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath),
-      fileSystem)
+    val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
     try {
       val lines = readLines(logData)
-      assert(lines.size === 2)
-      assert(lines(0).contains("SparkListenerApplicationStart"))
-      assert(lines(1).contains("SparkListenerApplicationEnd"))
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === 
applicationStart)
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === 
applicationEnd)
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      assert(lines.size === 3)
+      assert(lines(0).contains("SparkListenerLogStart"))
+      assert(lines(1).contains("SparkListenerApplicationStart"))
+      assert(lines(2).contains("SparkListenerApplicationEnd"))
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === 
applicationStart)
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === 
applicationEnd)
     } finally {
       logData.close()
     }
@@ -163,8 +176,10 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
     val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
     assert(sc.eventLogger.isDefined)
     val eventLogger = sc.eventLogger.get
+    val eventLogPath = eventLogger.logPath
     val expectedLogDir = testDir.toURI().toString()
-    assert(eventLogger.logPath.startsWith(expectedLogDir + "/"))
+    assert(eventLogPath === EventLoggingListener.getLogPath(
+      expectedLogDir, sc.applicationId, 
compressionCodec.map(CompressionCodec.getShortName)))
 
     // Begin listening for events that trigger asserts
     val eventExistenceListener = new EventExistenceListener(eventLogger)
@@ -178,8 +193,8 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
     eventExistenceListener.assertAllCallbacksInvoked()
 
     // Make sure expected events exist in the log file.
-    val (logData, version) = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath),
-      fileSystem)
+    val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
+    val logStart = SparkListenerLogStart(SPARK_VERSION)
     val lines = readLines(logData)
     val eventSet = mutable.Set(
       SparkListenerApplicationStart,
@@ -204,6 +219,7 @@ class EventLoggingListenerSuite extends FunSuite with 
BeforeAndAfter with Loggin
         }
       }
     }
+    assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
     assert(eventSet.isEmpty, "The following events are missing: " + 
eventSet.toSeq)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6776cb33/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 702c4cb..601694f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with 
BeforeAndAfter {
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
-      replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
+      replayer.replay(logData, logFilePath.toString)
     } finally {
       logData.close()
     }
@@ -115,12 +115,12 @@ class ReplayListenerSuite extends FunSuite with 
BeforeAndAfter {
     assert(!eventLog.isDir)
 
     // Replay events
-    val (logData, version) = 
EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem)
+    val logData = EventLoggingListener.openEventLog(eventLog.getPath(), 
fileSystem)
     val eventMonster = new EventMonster(conf)
     try {
       val replayer = new ReplayListenerBus()
       replayer.addListener(eventMonster)
-      replayer.replay(logData, version, eventLog.getPath().toString)
+      replayer.replay(logData, eventLog.getPath().toString)
     } finally {
       logData.close()
     }
@@ -150,11 +150,4 @@ class ReplayListenerSuite extends FunSuite with 
BeforeAndAfter {
     override def start() { }
 
   }
-
-  private def getCompressionCodec(codecName: String) = {
-    val conf = new SparkConf
-    conf.set("spark.io.compression.codec", codecName)
-    CompressionCodec.createCodec(conf)
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to