Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4821#discussion_r25552104
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -217,53 +219,60 @@ private[spark] object EventLoggingListener extends 
Logging {
       /**
        * Write metadata about the event log to the given stream.
        *
    -   * The header is a serialized version of a map, except it does not use 
Java serialization to
    -   * avoid incompatibilities between different JDKs. It writes one map 
entry per line, in
    -   * "key=value" format.
    -   *
    -   * The very last entry in the header is the `HEADER_END_MARKER` marker, 
so that the parsing code
    -   * can know when to stop.
    +   * The header is a single line of JSON in the beginning of the file. 
Note that this
    +   * assumes all metadata necessary to parse the log is also included in 
the file name.
    +   * The format needs to be kept in sync with the `openEventLog()` method 
below. Also, it
    +   * cannot change in new Spark versions without some other way of 
detecting the change.
        *
    -   * The format needs to be kept in sync with the openEventLog() method 
below. Also, it cannot
    -   * change in new Spark versions without some other way of detecting the 
change (like some
    -   * metadata encoded in the file name).
    -   *
    -   * @param logStream Raw output stream to the even log file.
    +   * @param logStream Raw output stream to the event log file.
        * @param compressionCodec Optional compression codec to use.
    -   * @return A stream where to write event log data. This may be a wrapper 
around the original
    +   * @return A stream to which event log data is written. This may be a 
wrapper around the original
        *         stream (for example, when compression is enabled).
        */
       def initEventLog(
           logStream: OutputStream,
           compressionCodec: Option[CompressionCodec]): OutputStream = {
    -    val meta = mutable.HashMap(("version" -> SPARK_VERSION))
    +    val metadata = new mutable.HashMap[String, String]
    +    // Some of these metadata are already encoded in the file name
    +    // Here we include them again within the file itself for completeness
    +    metadata += ("Event" -> 
Utils.getFormattedClassName(SparkListenerMetadataIdentifier))
    +    metadata += (SPARK_VERSION_KEY -> SPARK_VERSION)
         compressionCodec.foreach { codec =>
    -      meta += ("compressionCodec" -> codec.getClass().getName())
    +      metadata += (COMPRESSION_CODEC_KEY -> 
codec.getClass.getCanonicalName)
         }
    -
    -    def write(entry: String) = {
    -      val bytes = entry.getBytes(Charsets.UTF_8)
    -      if (bytes.length > MAX_HEADER_LINE_LENGTH) {
    -        throw new IOException(s"Header entry too long: ${entry}")
    -      }
    -      logStream.write(bytes, 0, bytes.length)
    +    val metadataJson = compact(render(JsonProtocol.mapToJson(metadata)))
    +    val metadataBytes = (metadataJson + "\n").getBytes(Charsets.UTF_8)
    +    if (metadataBytes.length > MAX_HEADER_LINE_LENGTH) {
    +      throw new IOException(s"Event log metadata too long: $metadataJson")
         }
    -
    -    meta.foreach { case (k, v) => write(s"$k=$v\n") }
    -    write(s"$HEADER_END_MARKER\n")
    -    
compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
    +    logStream.write(metadataBytes, 0, metadataBytes.length)
    +    logStream
       }
     
       /**
        * Return a file-system-safe path to the log file for the given 
application.
        *
    +   * Note that because we currently only create a single log file for each 
application,
    +   * we must encode all the information needed to parse this event log in 
the file name
    +   * instead of within the file itself. Otherwise, if the file is 
compressed, for instance,
    +   * we won't know which codec to use to decompress the metadata.
    +   *
        * @param logBaseDir Directory where the log file will be written.
        * @param appId A unique app ID.
    +   * @param compressionCodecName Name of the compression codec used to 
compress the contents
    +   *                             of the log, or None if compression is not 
enabled.
        * @return A path which consists of file-system-safe characters.
        */
    -  def getLogPath(logBaseDir: String, appId: String): String = {
    -    val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_").toLowerCase
    -    Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
    +  def getLogPath(
    +      logBaseDir: String,
    +      appId: String,
    +      compressionCodecName: Option[String]): String = {
    +    val sanitizedAppId = appId.replaceAll("[ :/]", 
"-").replaceAll("[${}'\"]", "_").toLowerCase
    +    // e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1
    +    // e.g. EVENT_LOG_ {...} 
_COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec
    +    val logName = 
s"${EVENT_LOG_KEY}_${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" +
    +      compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" 
}.getOrElse("")
    +    Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + 
logName.stripSuffix("/")
    --- End diff --
    
    But that's the `stripSuffix` on logName (first one), which is fine. The 
second one shouldn't be necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to