Repository: spark
Updated Branches:
  refs/heads/master 69ff8e8cf -> 9048e8102


[SPARK-6197][CORE] handle json exception when hisotry file not finished writing

For details, please refer to 
[SPARK-6197](https://issues.apache.org/jira/browse/SPARK-6197)

Author: Zhang, Liye <liye.zh...@intel.com>

Closes #4927 from liyezhang556520/jsonParseError and squashes the following 
commits:

5cbdc82 [Zhang, Liye] without unnecessary wrap
2b48831 [Zhang, Liye] small changes with sean owen's comments
2973024 [Zhang, Liye] handle json exception when file not finished writing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9048e810
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9048e810
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9048e810

Branch: refs/heads/master
Commit: 9048e8102e3f564842fa0dc6e82edce70b7dd3d7
Parents: 69ff8e8
Author: Zhang, Liye <liye.zh...@intel.com>
Authored: Fri Mar 13 13:59:54 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Mar 13 14:00:45 2015 +0000

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala |  3 ++-
 .../spark/scheduler/ReplayListenerBus.scala     | 25 ++++++++++++++++----
 2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9048e810/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1581429..22935c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -764,8 +764,9 @@ private[spark] class Master(
       val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new 
SecurityManager(conf),
         appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+      val maybeTruncated = 
eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
       try {
-        replayBus.replay(logInput, eventLogFile)
+        replayBus.replay(logInput, eventLogFile, maybeTruncated)
       } finally {
         logInput.close()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/9048e810/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 95273c7..86f357a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -21,6 +21,7 @@ import java.io.{InputStream, IOException}
 
 import scala.io.Source
 
+import com.fasterxml.jackson.core.JsonParseException
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.Logging
@@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
    *
    * @param logData Stream containing event log data.
    * @param sourceName Filename (or other source identifier) from whence 
@logData is being read
+   * @param maybeTruncated Indicate whether log file might be truncated (some 
abnormal situations 
+   *        encountered, log file might not finished writing) or not
    */
-  def replay(logData: InputStream, sourceName: String): Unit = {
+  def replay(
+      logData: InputStream,
+      sourceName: String,
+      maybeTruncated: Boolean = false): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 1
     try {
       val lines = Source.fromInputStream(logData).getLines()
-      lines.foreach { line =>
-        currentLine = line
-        postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+      while (lines.hasNext) {
+        currentLine = lines.next()
+        try {
+          postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
+        } catch {
+          case jpe: JsonParseException =>
+            // We can only ignore exception from last line of the file that 
might be truncated
+            if (!maybeTruncated || lines.hasNext) {
+              throw jpe
+            } else {
+              logWarning(s"Got JsonParseException from log file $sourceName" + 
+                s" at line $lineNumber, the file might not have finished 
writing cleanly.")
+            }
+        }
         lineNumber += 1
       }
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to