Repository: spark Updated Branches: refs/heads/master c5a4701ac -> 1abcbed67
[SPARK-22763][CORE] SHS: Ignore unknown events and parse through the file ## What changes were proposed in this pull request? While spark code changes, there are new events in event log: #19649 And we used to maintain a whitelist to avoid exceptions: #15663 Currently Spark history server will stop parsing on unknown events or unrecognized properties. We may still see part of the UI data. For better compatibility, we can ignore unknown events and parse through the log file. ## How was this patch tested? Unit test Author: Wang Gengliang <ltn...@gmail.com> Closes #19953 from gengliangwang/ReplayListenerBus. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1abcbed6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1abcbed6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1abcbed6 Branch: refs/heads/master Commit: 1abcbed678c2bc4f05640db2791fd2d84267d740 Parents: c5a4701 Author: Wang Gengliang <ltn...@gmail.com> Authored: Wed Dec 13 11:54:22 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Wed Dec 13 11:54:22 2017 -0800 ---------------------------------------------------------------------- .../spark/scheduler/ReplayListenerBus.scala | 37 ++++++++++---------- .../spark/scheduler/ReplayListenerSuite.scala | 29 +++++++++++++++ 2 files changed, 47 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1abcbed6/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 26a6a3e..c9cd662 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -69,6 +69,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { eventsFilter: ReplayEventsFilter): Unit = { var currentLine: String = null var lineNumber: Int = 0 + val unrecognizedEvents = new scala.collection.mutable.HashSet[String] + val unrecognizedProperties = new scala.collection.mutable.HashSet[String] try { val lineEntries = lines @@ -84,16 +86,22 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) } catch { - case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) => - // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1. - // It's safe since no place uses them. - logWarning(s"Dropped incompatible Structured Streaming log: $currentLine") - case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith( - "Unrecognized field \"queryStatus\" " + - "(class org.apache.spark.sql.streaming.StreamingQueryListener$") => - // Ignore events generated by Structured Streaming in Spark 2.0.2 - // It's safe since no place uses them. - logWarning(s"Dropped incompatible Structured Streaming log: $currentLine") + case e: ClassNotFoundException => + // Ignore unknown events, parse through the event log file. + // To avoid spamming, warnings are only displayed once for each unknown event. + if (!unrecognizedEvents.contains(e.getMessage)) { + logWarning(s"Drop unrecognized event: ${e.getMessage}") + unrecognizedEvents.add(e.getMessage) + } + logDebug(s"Drop incompatible event log: $currentLine") + case e: UnrecognizedPropertyException => + // Ignore unrecognized properties, parse through the event log file. + // To avoid spamming, warnings are only displayed once for each unrecognized property. + if (!unrecognizedProperties.contains(e.getMessage)) { + logWarning(s"Drop unrecognized property: ${e.getMessage}") + unrecognizedProperties.add(e.getMessage) + } + logDebug(s"Drop incompatible event log: $currentLine") case jpe: JsonParseException => // We can only ignore exception from last line of the file that might be truncated // the last entry may not be the very last line in the event log, but we treat it @@ -125,13 +133,4 @@ private[spark] object ReplayListenerBus { // utility filter that selects all event logs during replay val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true } - - /** - * Classes that were removed. Structured Streaming doesn't use them any more. However, parsing - * old json may fail and we can just ignore these failures. - */ - val KNOWN_REMOVED_CLASSES = Set( - "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress", - "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated" - ) } http://git-wip-us.apache.org/repos/asf/spark/blob/1abcbed6/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d17e386..73e7b3f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -128,6 +128,35 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp } } + test("Replay incompatible event log") { + val logFilePath = Utils.getFilePath(testDir, "incompatible.txt") + val fstream = fileSystem.create(logFilePath) + val writer = new PrintWriter(fstream) + val applicationStart = SparkListenerApplicationStart("Incompatible App", None, + 125L, "UserUsingIncompatibleVersion", None) + val applicationEnd = SparkListenerApplicationEnd(1000L) + // scalastyle:off println + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""") + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + // scalastyle:on println + writer.close() + + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val logData = fileSystem.open(logFilePath) + val eventMonster = new EventMonster(conf) + try { + val replayer = new ReplayListenerBus() + replayer.addListener(eventMonster) + replayer.replay(logData, logFilePath.toString) + } finally { + logData.close() + } + assert(eventMonster.loggedEvents.size === 2) + assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) + assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) + } + // This assumes the correctness of EventLoggingListener test("End-to-end replay") { testApplicationReplay() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org