Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4821#discussion_r25552104 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -217,53 +219,60 @@ private[spark] object EventLoggingListener extends Logging { /** * Write metadata about the event log to the given stream. * - * The header is a serialized version of a map, except it does not use Java serialization to - * avoid incompatibilities between different JDKs. It writes one map entry per line, in - * "key=value" format. - * - * The very last entry in the header is the `HEADER_END_MARKER` marker, so that the parsing code - * can know when to stop. + * The header is a single line of JSON in the beginning of the file. Note that this + * assumes all metadata necessary to parse the log is also included in the file name. + * The format needs to be kept in sync with the `openEventLog()` method below. Also, it + * cannot change in new Spark versions without some other way of detecting the change. * - * The format needs to be kept in sync with the openEventLog() method below. Also, it cannot - * change in new Spark versions without some other way of detecting the change (like some - * metadata encoded in the file name). - * - * @param logStream Raw output stream to the even log file. + * @param logStream Raw output stream to the event log file. * @param compressionCodec Optional compression codec to use. - * @return A stream where to write event log data. This may be a wrapper around the original + * @return A stream to which event log data is written. This may be a wrapper around the original * stream (for example, when compression is enabled). */ def initEventLog( logStream: OutputStream, compressionCodec: Option[CompressionCodec]): OutputStream = { - val meta = mutable.HashMap(("version" -> SPARK_VERSION)) + val metadata = new mutable.HashMap[String, String] + // Some of these metadata are already encoded in the file name + // Here we include them again within the file itself for completeness + metadata += ("Event" -> Utils.getFormattedClassName(SparkListenerMetadataIdentifier)) + metadata += (SPARK_VERSION_KEY -> SPARK_VERSION) compressionCodec.foreach { codec => - meta += ("compressionCodec" -> codec.getClass().getName()) + metadata += (COMPRESSION_CODEC_KEY -> codec.getClass.getCanonicalName) } - - def write(entry: String) = { - val bytes = entry.getBytes(Charsets.UTF_8) - if (bytes.length > MAX_HEADER_LINE_LENGTH) { - throw new IOException(s"Header entry too long: ${entry}") - } - logStream.write(bytes, 0, bytes.length) + val metadataJson = compact(render(JsonProtocol.mapToJson(metadata))) + val metadataBytes = (metadataJson + "\n").getBytes(Charsets.UTF_8) + if (metadataBytes.length > MAX_HEADER_LINE_LENGTH) { + throw new IOException(s"Event log metadata too long: $metadataJson") } - - meta.foreach { case (k, v) => write(s"$k=$v\n") } - write(s"$HEADER_END_MARKER\n") - compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream) + logStream.write(metadataBytes, 0, metadataBytes.length) + logStream } /** * Return a file-system-safe path to the log file for the given application. * + * Note that because we currently only create a single log file for each application, + * we must encode all the information needed to parse this event log in the file name + * instead of within the file itself. Otherwise, if the file is compressed, for instance, + * we won't know which codec to use to decompress the metadata. + * * @param logBaseDir Directory where the log file will be written. * @param appId A unique app ID. + * @param compressionCodecName Name of the compression codec used to compress the contents + * of the log, or None if compression is not enabled. * @return A path which consists of file-system-safe characters. */ - def getLogPath(logBaseDir: String, appId: String): String = { - val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + def getLogPath( + logBaseDir: String, + appId: String, + compressionCodecName: Option[String]): String = { + val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase + // e.g. EVENT_LOG_app_123_SPARK_VERSION_1.3.1 + // e.g. EVENT_LOG_ {...} _COMPRESSION_CODEC_org.apache.spark.io.LZFCompressionCodec + val logName = s"${EVENT_LOG_KEY}_${sanitizedAppId}_${SPARK_VERSION_KEY}_$SPARK_VERSION" + + compressionCodecName.map { c => s"_${COMPRESSION_CODEC_KEY}_$c" }.getOrElse("") + Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName.stripSuffix("/") --- End diff -- But that's the `stripSuffix` on logName (first one), which is fine. The second one shouldn't be necessary.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org