Repository: spark Updated Branches: refs/heads/master 69ff8e8cf -> 9048e8102
[SPARK-6197][CORE] handle json exception when hisotry file not finished writing For details, please refer to [SPARK-6197](https://issues.apache.org/jira/browse/SPARK-6197) Author: Zhang, Liye <liye.zh...@intel.com> Closes #4927 from liyezhang556520/jsonParseError and squashes the following commits: 5cbdc82 [Zhang, Liye] without unnecessary wrap 2b48831 [Zhang, Liye] small changes with sean owen's comments 2973024 [Zhang, Liye] handle json exception when file not finished writing Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9048e810 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9048e810 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9048e810 Branch: refs/heads/master Commit: 9048e8102e3f564842fa0dc6e82edce70b7dd3d7 Parents: 69ff8e8 Author: Zhang, Liye <liye.zh...@intel.com> Authored: Fri Mar 13 13:59:54 2015 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Mar 13 14:00:45 2015 +0000 ---------------------------------------------------------------------- .../org/apache/spark/deploy/master/Master.scala | 3 ++- .../spark/scheduler/ReplayListenerBus.scala | 25 ++++++++++++++++---- 2 files changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9048e810/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 1581429..22935c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -764,8 +764,9 @@ private[spark] class Master( val replayBus = new ReplayListenerBus() val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}") + val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS) try { - replayBus.replay(logInput, eventLogFile) + replayBus.replay(logInput, eventLogFile, maybeTruncated) } finally { logInput.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/9048e810/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 95273c7..86f357a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -21,6 +21,7 @@ import java.io.{InputStream, IOException} import scala.io.Source +import com.fasterxml.jackson.core.JsonParseException import org.json4s.jackson.JsonMethods._ import org.apache.spark.Logging @@ -40,15 +41,31 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * * @param logData Stream containing event log data. * @param sourceName Filename (or other source identifier) from whence @logData is being read + * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations + * encountered, log file might not finished writing) or not */ - def replay(logData: InputStream, sourceName: String): Unit = { + def replay( + logData: InputStream, + sourceName: String, + maybeTruncated: Boolean = false): Unit = { var currentLine: String = null var lineNumber: Int = 1 try { val lines = Source.fromInputStream(logData).getLines() - lines.foreach { line => - currentLine = line - postToAll(JsonProtocol.sparkEventFromJson(parse(line))) + while (lines.hasNext) { + currentLine = lines.next() + try { + postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) + } catch { + case jpe: JsonParseException => + // We can only ignore exception from last line of the file that might be truncated + if (!maybeTruncated || lines.hasNext) { + throw jpe + } else { + logWarning(s"Got JsonParseException from log file $sourceName" + + s" at line $lineNumber, the file might not have finished writing cleanly.") + } + } lineNumber += 1 } } catch { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org