Repository: spark Updated Branches: refs/heads/master e1a1ff810 -> fc6d3e796
[SPARK-5783] Better eventlog-parsing error messages Author: Ryan Williams <ryan.blake.willi...@gmail.com> Closes #4573 from ryan-williams/history and squashes the following commits: a8647ec [Ryan Williams] fix test calls to .replay() 98aa3fe [Ryan Williams] include filename in history-parsing error message 8deecf0 [Ryan Williams] add line number to history-parsing error message b668b52 [Ryan Williams] add log info line to history-eventlog parsing Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc6d3e79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc6d3e79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc6d3e79 Branch: refs/heads/master Commit: fc6d3e796a3c600e2f7827562455d555e59775ae Parents: e1a1ff8 Author: Ryan Williams <ryan.blake.willi...@gmail.com> Authored: Fri Feb 13 09:47:26 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Fri Feb 13 09:47:26 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 3 ++- .../main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- .../org/apache/spark/scheduler/ReplayListenerBus.scala | 9 ++++++--- .../org/apache/spark/scheduler/ReplayListenerSuite.scala | 4 ++-- 4 files changed, 11 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fc6d3e79/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 868c63d..885fa0f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -247,6 +247,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = { val logPath = eventLog.getPath() + logInfo(s"Replaying log path: $logPath") val (logInput, sparkVersion) = if (isLegacyLogDirectory(eventLog)) { openLegacyEventLog(logPath) @@ -256,7 +257,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis try { val appListener = new ApplicationEventListener bus.addListener(appListener) - bus.replay(logInput, sparkVersion) + bus.replay(logInput, sparkVersion, logPath.toString) new FsApplicationHistoryInfo( logPath.getName(), appListener.appId.getOrElse(logPath.getName()), http://git-wip-us.apache.org/repos/asf/spark/blob/fc6d3e79/core/src/main/scala/org/apache/spark/deploy/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 53e4539..8cc6ec1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -761,7 +761,7 @@ private[spark] class Master( val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}") try { - replayBus.replay(logInput, sparkVersion) + replayBus.replay(logInput, sparkVersion, eventLogFile) } finally { logInput.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/fc6d3e79/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 584f4e7..d9c3a10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -40,21 +40,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { * * @param logData Stream containing event log data. * @param version Spark version that generated the events. + * @param sourceName Filename (or other source identifier) from whence @logData is being read */ - def replay(logData: InputStream, version: String) { + def replay(logData: InputStream, version: String, sourceName: String) { var currentLine: String = null + var lineNumber: Int = 1 try { val lines = Source.fromInputStream(logData).getLines() lines.foreach { line => currentLine = line postToAll(JsonProtocol.sparkEventFromJson(parse(line))) + lineNumber += 1 } } catch { case ioe: IOException => throw ioe case e: Exception => - logError("Exception in parsing Spark event log.", e) - logError("Malformed line: %s\n".format(currentLine)) + logError(s"Exception parsing Spark event log: $sourceName", e) + logError(s"Malformed line #$lineNumber: $currentLine\n") } } http://git-wip-us.apache.org/repos/asf/spark/blob/fc6d3e79/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 7e360cc..702c4cb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) - replayer.replay(logData, SPARK_VERSION) + replayer.replay(logData, SPARK_VERSION, logFilePath.toString) } finally { logData.close() } @@ -120,7 +120,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) - replayer.replay(logData, version) + replayer.replay(logData, version, eventLog.getPath().toString) } finally { logData.close() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org