This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d9669bd  [SPARK-33146][CORE] Check for non-fatal errors when loading 
new applications in SHS
d9669bd is described below

commit d9669bdf0ff4ed9951d7077b8dc9ad94507615c5
Author: Adam Binford <adam.binf...@radiantsolutions.com>
AuthorDate: Thu Oct 15 11:59:29 2020 +0900

    [SPARK-33146][CORE] Check for non-fatal errors when loading new 
applications in SHS
    
    ### What changes were proposed in this pull request?
    
    Adds an additional check for non-fatal errors when attempting to add a new 
entry to the history server application listing.
    
    ### Why are the changes needed?
    
    A bad rolling event log folder (missing appstatus file or no log files) 
would cause no applications to be loaded by the Spark history server. Figuring 
out why invalid event log folders are created in the first place will be 
addressed in separate issues, this just lets the history server skip the 
invalid folder and successfully load all the valid applications.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New UT
    
    Closes #30037 from Kimahriman/bug/rolling-log-crashing-history.
    
    Authored-by: Adam Binford <adam.binf...@radiantsolutions.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 9ab0ec4e38e5df0537b38cb0f89e004ad57bec90)
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
---
 .../spark/deploy/history/FsHistoryProvider.scala   |  3 ++
 .../deploy/history/FsHistoryProviderSuite.scala    | 49 ++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c262152..5970708 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -526,6 +526,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
                 reader.fileSizeForLastIndex > 0
               } catch {
                 case _: FileNotFoundException => false
+                case NonFatal(e) =>
+                  logWarning(s"Error while reading new log 
${reader.rootPath}", e)
+                  false
               }
 
             case _: FileNotFoundException =>
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c2f34fc..f3beb35 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1470,6 +1470,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
     }
   }
 
+  test("SPARK-33146: don't let one bad rolling log folder prevent loading 
other applications") {
+    withTempDir { dir =>
+      val conf = createTestConf(true)
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+      val provider = new FsHistoryProvider(conf)
+
+      val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+      writer.start()
+
+      writeEventsToRollingWriter(writer, Seq(
+        SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(dir.listFiles().size === 1)
+      assert(provider.getListing.length === 1)
+
+      // Manually delete the appstatus file to make an invalid rolling event 
log
+      val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new 
Path(writer.logPath),
+        "app", None, true)
+      fs.delete(appStatusPath, false)
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(provider.getListing.length === 0)
+
+      // Create a new application
+      val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, 
conf, hadoopConf)
+      writer2.start()
+      writeEventsToRollingWriter(writer2, Seq(
+        SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+      // Both folders exist but only one application found
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(provider.getListing.length === 1)
+      assert(dir.listFiles().size === 2)
+
+      // Make sure a new provider sees the valid application
+      provider.stop()
+      val newProvider = new FsHistoryProvider(conf)
+      newProvider.checkForLogs()
+      assert(newProvider.getListing.length === 1)
+    }
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform 
checks on the updated
    * app list. Example:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to