Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1222#discussion_r18501179
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -128,125 +165,140 @@ private[spark] class EventLoggingListener(
     
       /**
        * Stop logging events.
    -   * In addition, create an empty special file to indicate application 
completion.
        */
       def stop() = {
    -    logger.newFile(APPLICATION_COMPLETE)
    -    logger.stop()
    +    writer.foreach(_.close())
    +
    +    val target = new Path(logPath)
    +    if (fileSystem.exists(target)) {
    +      throw new IOException("Target log file already exists 
(%s)".format(logPath))
    +    }
    +    fileSystem.rename(new Path(logPath + IN_PROGRESS), target)
       }
    +
     }
     
     private[spark] object EventLoggingListener extends Logging {
    +  // Suffix applied to the names of files still being written by 
applications.
    +  val IN_PROGRESS = ".inprogress"
       val DEFAULT_LOG_DIR = "/tmp/spark-events"
    -  val LOG_PREFIX = "EVENT_LOG_"
    -  val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
    -  val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
    -  val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
    -  val LOG_FILE_PERMISSIONS = 
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
     
    -  // A cache for compression codecs to avoid creating the same codec many 
times
    -  private val codecMap = new mutable.HashMap[String, CompressionCodec]
    +  private val LOG_FILE_PERMISSIONS = 
FsPermission.createImmutable(Integer.parseInt("770", 8)
    +    .toShort)
     
    -  def isEventLogFile(fileName: String): Boolean = {
    -    fileName.startsWith(LOG_PREFIX)
    -  }
    +  // Marker for the end of header data in a log file. After this marker 
(and the following
    +  // new line), log data, potentially compressed, will be found.
    +  private val HEADER_END_MARKER = "HEADER_END_MARKER"
     
    -  def isSparkVersionFile(fileName: String): Boolean = {
    -    fileName.startsWith(SPARK_VERSION_PREFIX)
    -  }
    +  // A cache for compression codecs to avoid creating the same codec many 
times
    +  private val codecMap = new mutable.HashMap[String, CompressionCodec]
     
    -  def isCompressionCodecFile(fileName: String): Boolean = {
    -    fileName.startsWith(COMPRESSION_CODEC_PREFIX)
    -  }
    +  /**
    +   * Write metadata about the event log to the given stream.
    +   *
    +   * The header is a serialized version of a map, except it does not use 
Java serialization to
    +   * avoid incompatibilities between different JDKs. It writes map entries 
in a simple format:
    +   *
    +   *   [len][bytes]
    +   *
    +   * Where `len` is an integer, and `bytes` is the UTF-8 encoded version 
of "key=value". The
    +   * very last entry in the header is the HEADER_END_MARKER marker, 
encoded like the above,
    +   * so that the parsing code can know when to stop.
    +   *
    +   * The format needs to be kept in sync with the openEventLog() method 
below. Also, it cannot
    +   * change in new Spark versions without some other way of detecting the 
change (like some
    +   * metadata encoded in the file name).
    +   *
    +   * @param logStream Raw output stream to the even log file.
    +   * @param compressionCodec Optional compression codec to use.
    +   * @return A stream where to write event log data. This may be a wrapper 
around the original
    +   *         stream (for example, when compression is enabled).
    +   */
    +  def initEventLog(logStream: OutputStream, compressionCodec: 
Option[CompressionCodec]):
    +    OutputStream = {
    +    val meta = mutable.HashMap(("version" -> SPARK_VERSION))
    +    compressionCodec.foreach { codec =>
    +      meta += ("compressionCodec" -> codec.getClass().getName())
    +    }
     
    -  def isApplicationCompleteFile(fileName: String): Boolean = {
    -    fileName == APPLICATION_COMPLETE
    -  }
    +    val header = new DataOutputStream(logStream)
    +    def write(entry: String) = {
    +      val bytes = entry.getBytes(Charsets.UTF_8)
    +      header.writeInt(bytes.length)
    +      header.write(bytes, 0, bytes.length)
    +    }
     
    -  def parseSparkVersion(fileName: String): String = {
    -    if (isSparkVersionFile(fileName)) {
    -      fileName.replaceAll(SPARK_VERSION_PREFIX, "")
    -    } else ""
    -  }
    +    meta.foreach { case (k, v) => write(s"$k=$v") }
    +    write(EventLoggingListener.HEADER_END_MARKER)
    +    header.flush()
     
    -  def parseCompressionCodec(fileName: String): String = {
    -    if (isCompressionCodecFile(fileName)) {
    -      fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
    -    } else ""
    +    
compressionCodec.map(_.compressedOutputStream(logStream)).getOrElse(logStream)
       }
     
       /**
    -   * Return a file-system-safe path to the log directory for the given 
application.
    +   * Return a file-system-safe path to the log file for the given 
application.
        *
    -   * @param logBaseDir A base directory for the path to the log directory 
for given application.
    +   * @param logBaseDir Directory where the log file will be written.
        * @param appId A unique app ID.
        * @return A path which consists of file-system-safe characters.
        */
    -  def getLogDirPath(logBaseDir: String, appId: String): String = {
    +  def getLogPath(logBaseDir: String, appId: String): String = {
         val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", 
"_").toLowerCase
         Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
       }
     
       /**
    -   * Parse the event logging information associated with the logs in the 
given directory.
    +   * Opens an event log file and returns an input stream to the event data.
        *
    -   * Specifically, this looks for event log files, the Spark version file, 
the compression
    -   * codec file (if event logs are compressed), and the application 
completion file (if the
    -   * application has run to completion).
    +   * @return 2-tuple (event input stream, Spark version of event data)
        */
    -  def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): 
EventLoggingInfo = {
    +  def openEventLog(log: Path, fs: FileSystem): (InputStream, String) = {
    +    // It's not clear whether FileSystem.open() throws 
FileNotFoundException or just plain
    +    // IOException when a file does not exist, so try our best to throw a 
proper exception.
    +    if (!fs.exists(log)) {
    +      throw new FileNotFoundException(s"File $log does not exist.")
    +    }
    +
    +    val in = new DataInputStream(new BufferedInputStream(fs.open(log)))
    +    def read() = {
    +      // TODO: a corrupted / malicious file can cause this code to load 
lots of data into
    +      // memory. Better to have a limit on the size of the header?
    +      val len = in.readInt()
    +      val bytes = new Array[Byte](len)
    +      in.readFully(bytes)
    +      new String(bytes, Charsets.UTF_8)
    +    }
    +
         try {
    -      val fileStatuses = fileSystem.listStatus(logDir)
    -      val filePaths =
    -        if (fileStatuses != null) {
    -          fileStatuses.filter(!_.isDir).map(_.getPath).toSeq
    -        } else {
    -          Seq[Path]()
    +      val meta = new mutable.HashMap[String, String]()
    +      var foundEndMarker = false
    +      while (!foundEndMarker) {
    +        read() match {
    +          case HEADER_END_MARKER =>
    +            foundEndMarker = true
    +          case entry =>
    +            val prop = entry.split("=", 2)
    +            if (prop.length != 2) {
    +              throw new IllegalArgumentException("Invalid metadata in log 
file.")
    +            }
    +            meta += (prop(0) -> prop(1))
             }
    -      if (filePaths.isEmpty) {
    -        logWarning("No files found in logging directory %s".format(logDir))
           }
    -      EventLoggingInfo(
    -        logPaths = filePaths.filter { path => isEventLogFile(path.getName) 
},
    -        sparkVersion = filePaths
    -          .find { path => isSparkVersionFile(path.getName) }
    -          .map { path => parseSparkVersion(path.getName) }
    -          .getOrElse("<Unknown>"),
    -        compressionCodec = filePaths
    -          .find { path => isCompressionCodecFile(path.getName) }
    -          .map { path =>
    -            val codec = 
EventLoggingListener.parseCompressionCodec(path.getName)
    -            val conf = new SparkConf
    -            conf.set("spark.io.compression.codec", codec)
    -            codecMap.getOrElseUpdate(codec, 
CompressionCodec.createCodec(conf))
    -          },
    -        applicationComplete = filePaths.exists { path => 
isApplicationCompleteFile(path.getName) }
    -      )
    +
    +      val sparkVersion = meta.get("version").getOrElse(
    +        throw new IllegalArgumentException("Missing Spark version in log 
metadata."))
    +
    +      val codec = meta.get("compressionCodec").map { codecName =>
    +          val conf = new SparkConf()
    +          codecMap.getOrElseUpdate(codecName, 
CompressionCodec.createCodec(conf, codecName))
    +        }
    --- End diff --
    
    No need to indent this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to