Repository: spark
Updated Branches:
  refs/heads/master c5a4701ac -> 1abcbed67


[SPARK-22763][CORE] SHS: Ignore unknown events and parse through the file

## What changes were proposed in this pull request?

While spark code changes, there are new events in event log: #19649
And we used to maintain a whitelist to avoid exceptions: #15663
Currently Spark history server will stop parsing on unknown events or 
unrecognized properties. We may still see part of the UI data.
For better compatibility, we can ignore unknown events and parse through the 
log file.

## How was this patch tested?
Unit test

Author: Wang Gengliang <ltn...@gmail.com>

Closes #19953 from gengliangwang/ReplayListenerBus.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1abcbed6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1abcbed6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1abcbed6

Branch: refs/heads/master
Commit: 1abcbed678c2bc4f05640db2791fd2d84267d740
Parents: c5a4701
Author: Wang Gengliang <ltn...@gmail.com>
Authored: Wed Dec 13 11:54:22 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Wed Dec 13 11:54:22 2017 -0800

----------------------------------------------------------------------
 .../spark/scheduler/ReplayListenerBus.scala     | 37 ++++++++++----------
 .../spark/scheduler/ReplayListenerSuite.scala   | 29 +++++++++++++++
 2 files changed, 47 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1abcbed6/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 26a6a3e..c9cd662 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -69,6 +69,8 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
       eventsFilter: ReplayEventsFilter): Unit = {
     var currentLine: String = null
     var lineNumber: Int = 0
+    val unrecognizedEvents = new scala.collection.mutable.HashSet[String]
+    val unrecognizedProperties = new scala.collection.mutable.HashSet[String]
 
     try {
       val lineEntries = lines
@@ -84,16 +86,22 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
 
           postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
         } catch {
-          case e: ClassNotFoundException if 
KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
-            // Ignore events generated by Structured Streaming in Spark 2.0.0 
and 2.0.1.
-            // It's safe since no place uses them.
-            logWarning(s"Dropped incompatible Structured Streaming log: 
$currentLine")
-          case e: UnrecognizedPropertyException if e.getMessage != null && 
e.getMessage.startsWith(
-            "Unrecognized field \"queryStatus\" " +
-              "(class org.apache.spark.sql.streaming.StreamingQueryListener$") 
=>
-            // Ignore events generated by Structured Streaming in Spark 2.0.2
-            // It's safe since no place uses them.
-            logWarning(s"Dropped incompatible Structured Streaming log: 
$currentLine")
+          case e: ClassNotFoundException =>
+            // Ignore unknown events, parse through the event log file.
+            // To avoid spamming, warnings are only displayed once for each 
unknown event.
+            if (!unrecognizedEvents.contains(e.getMessage)) {
+              logWarning(s"Drop unrecognized event: ${e.getMessage}")
+              unrecognizedEvents.add(e.getMessage)
+            }
+            logDebug(s"Drop incompatible event log: $currentLine")
+          case e: UnrecognizedPropertyException =>
+            // Ignore unrecognized properties, parse through the event log 
file.
+            // To avoid spamming, warnings are only displayed once for each 
unrecognized property.
+            if (!unrecognizedProperties.contains(e.getMessage)) {
+              logWarning(s"Drop unrecognized property: ${e.getMessage}")
+              unrecognizedProperties.add(e.getMessage)
+            }
+            logDebug(s"Drop incompatible event log: $currentLine")
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that 
might be truncated
             // the last entry may not be the very last line in the event log, 
but we treat it
@@ -125,13 +133,4 @@ private[spark] object ReplayListenerBus {
 
   // utility filter that selects all event logs during replay
   val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
-
-  /**
-   * Classes that were removed. Structured Streaming doesn't use them any 
more. However, parsing
-   * old json may fail and we can just ignore these failures.
-   */
-  val KNOWN_REMOVED_CLASSES = Set(
-    "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
-    "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
-  )
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1abcbed6/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index d17e386..73e7b3f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -128,6 +128,35 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
     }
   }
 
+  test("Replay incompatible event log") {
+    val logFilePath = Utils.getFilePath(testDir, "incompatible.txt")
+    val fstream = fileSystem.create(logFilePath)
+    val writer = new PrintWriter(fstream)
+    val applicationStart = SparkListenerApplicationStart("Incompatible App", 
None,
+      125L, "UserUsingIncompatibleVersion", None)
+    val applicationEnd = SparkListenerApplicationEnd(1000L)
+    // scalastyle:off println
+    
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+    
writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
+    
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+    // scalastyle:on println
+    writer.close()
+
+    val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
+    val logData = fileSystem.open(logFilePath)
+    val eventMonster = new EventMonster(conf)
+    try {
+      val replayer = new ReplayListenerBus()
+      replayer.addListener(eventMonster)
+      replayer.replay(logData, logFilePath.toString)
+    } finally {
+      logData.close()
+    }
+    assert(eventMonster.loggedEvents.size === 2)
+    assert(eventMonster.loggedEvents(0) === 
JsonProtocol.sparkEventToJson(applicationStart))
+    assert(eventMonster.loggedEvents(1) === 
JsonProtocol.sparkEventToJson(applicationEnd))
+  }
+
   // This assumes the correctness of EventLoggingListener
   test("End-to-end replay") {
     testApplicationReplay()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to