This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 599adc851b2 [SPARK-39083][CORE] Fix race condition between update and 
clean app data
599adc851b2 is described below

commit 599adc851b290c0e9d867d183fe5d030250c28f8
Author: tan.vu <tan...@linecorp.com>
AuthorDate: Sun May 8 08:09:20 2022 -0500

    [SPARK-39083][CORE] Fix race condition between update and clean app data
    
    ### What changes were proposed in this pull request?
    make `cleanAppData` atomic to prevent race condition between update and 
clean app data.
    When the race condition happens, it could lead to a scenario when 
`cleanAppData` delete the entry of
    ApplicationInfoWrapper for an application right after it has been updated 
by `mergeApplicationListing`.
    So there will be cases when the HS Web UI displays `Application not found` 
for applications whose logs does exist.
    
    #### Error message
    ```
    22/04/29 17:16:21 DEBUG FsHistoryProvider: New/updated attempts found: 1 
ArrayBuffer(viewfs://iu/log/spark3/application_1651119726430_138107_1)
    22/04/29 17:16:21 INFO FsHistoryProvider: Parsing 
viewfs://iu/log/spark3/application_1651119726430_138107_1 for listing data...
    22/04/29 17:16:21 INFO FsHistoryProvider: Looking for end event; skipping 
10805037 bytes from viewfs://iu/log/spark3/application_1651119726430_138107_1...
    22/04/29 17:16:21 INFO FsHistoryProvider: Finished parsing 
viewfs://iu/log/spark3/application_1651119726430_138107_1
    22/04/29 17:16:21 ERROR Utils: Uncaught exception in thread 
log-replay-executor-7
    java.util.NoSuchElementException
            at 
org.apache.spark.util.kvstore.InMemoryStore.read(InMemoryStore.java:85)
            at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkAndCleanLog$3(FsHistoryProvider.scala:927)
            at scala.Option.foreach(Option.scala:407)
            at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkAndCleanLog$1(FsHistoryProvider.scala:926)
            at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
            at org.apache.spark.util.Utils$.tryLog(Utils.scala:2032)
            at 
org.apache.spark.deploy.history.FsHistoryProvider.checkAndCleanLog(FsHistoryProvider.scala:916)
            at 
org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:712)
            at 
org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:576)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ```
    #### Background
    Currently, the HS runs the `checkForLogs` to build the application list 
based on the current contents of the log directory for every 10 seconds by 
default.
    - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L296-L299
    - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L472
    
    In each turn of execution, this method scans the specified logDir and parse 
the log files to update its KVStore:
    - detect new updated/added files to process : 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L574-L578
    - detect stale data to remove: 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600
    
    These 2 operations are executed in different threads as 
`submitLogProcessTask` uses `replayExecutor` to submit tasks.
    
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1389-L1401
    
    ### When does the bug happen?
    `Application not found` error happens in the following scenario:
    In the first run of `checkForLogs`, it detected a newly-added log 
`viewfs://iu/log/spark3/AAA_1.inprogress` (log of an in-progress application 
named AAA). So it will add 2 entries to the KVStore
    - one entry of key-value as the key is the logPath 
(`viewfs://iu/log/spark3/AAA_1.inprogress`)  and the value is an instance of 
LogInfo represented the log
      -  
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L495-L505
      - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L545-L552
    - one entry of key-value as the key is the applicationId (`AAA`) and  the 
value is an instance of ApplicationInfoWrapper holding the information of the 
application.
      - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L825
      - 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172
    
    In the next run of `checkForLogs`, now the AAA application has finished, 
the log `viewfs://iu/log/spark3/AAA_1.inprogress` has been deleted and a new 
log `viewfs://iu/log/spark3/AAA_1` is created. So  `checkForLogs` will do the 
following 2 things in 2 different threads:
    - Thread 1: parsing the new log `viewfs://iu/log/spark3/AAA_1` and update 
data in its KVStore
      - add a new entry of key: `viewfs://iu/log/spark3/AAA_1` and value: an 
instance of LogInfo represented the log
      - updated the entry with key=applicationId (`AAA`)  with new value of  an 
instance of ApplicationInfoWrapper (for example: the isCompleted flag now 
change from false to true)
    - Thread 2:  data related to `viewfs://iu/log/spark3/AAA_1.inprogress` is 
now considered as stale and it must be deleted.
      - clean App data for `viewfs://iu/log/spark3/AAA_1.inprogress` 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600
      - Inside `cleanAppData`, first it loads the latest information of 
`ApplicationInfoWrapper` from the KVStore: 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L632
  For most of the time, when this line is executed, Thread 1 already finished 
`updating the entry with key=applicationId (AAA)  with new value of  an 
instance of ApplicationInfoWrapper` so this condition 
https://github.com/apache/spark/blob/v3.2.1/core/s [...]
    
    So here we make the `cleanAppData` method atomic just like the `addListing` 
method 
https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172
 so that
    - If Thread 1 gets the lock on the `listing` before Thread 2, it will 
update the entry for the application, so in Thread2 `isStale` will be false, 
the entry for the application will not be removed from KVStore
    - If Thread 2 gets the lock on the `listing` before Thread 1, then 
`isStale` will be true, the entry for the application will be removed from 
KVStore but after that it will be added again by Thread 1.
    In both case, the entry for the application will not be deleted 
unexpectedly from KVStore.
    
    ### Why are the changes needed?
    Fix the bug causing HS Web UI to display `Application not found` for 
applications whose logs does exist.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, bug fix.
    
    ## How was this patch tested?
    Manual test.
    Deployed in our Spark HS and the `java.util.NoSuchElementException` 
exception does not happen anymore.
    `Application not found` error does not happen anymore.
    
    Closes #36424 from tanvn/SPARK-39083.
    
    Authored-by: tan.vu <tan...@linecorp.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit 29643265a9f5e8142d20add5350c614a55161451)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 51 +++++++++++-----------
 1 file changed, 26 insertions(+), 25 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 6e2471d4b1b..1566fda9463 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -629,37 +629,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   private def cleanAppData(appId: String, attemptId: Option[String], logPath: 
String): Unit = {
     try {
-      val app = load(appId)
-      val (attempt, others) = app.attempts.partition(_.info.attemptId == 
attemptId)
-
-      assert(attempt.isEmpty || attempt.size == 1)
-      val isStale = attempt.headOption.exists { a =>
-        if (a.logPath != new Path(logPath).getName()) {
-          // If the log file name does not match, then probably the old log 
file was from an
-          // in progress application. Just return that the app should be left 
alone.
-          false
-        } else {
-          val maybeUI = synchronized {
-            activeUIs.remove(appId -> attemptId)
-          }
-
-          maybeUI.foreach { ui =>
-            ui.invalidate()
-            ui.ui.store.close()
+      var isStale = false
+      listing.synchronized {
+        val app = load(appId)
+        val (attempt, others) = app.attempts.partition(_.info.attemptId == 
attemptId)
+
+        assert(attempt.isEmpty || attempt.size == 1)
+        isStale = attempt.headOption.exists { a =>
+          if (a.logPath != new Path(logPath).getName()) {
+            // If the log file name does not match, then probably the old log 
file was from an
+            // in progress application. Just return that the app should be 
left alone.
+            false
+          } else {
+            if (others.nonEmpty) {
+              val newAppInfo = new ApplicationInfoWrapper(app.info, others)
+              listing.write(newAppInfo)
+            } else {
+              listing.delete(classOf[ApplicationInfoWrapper], appId)
+            }
+            true
           }
-
-          diskManager.foreach(_.release(appId, attemptId, delete = true))
-          true
         }
       }
 
       if (isStale) {
-        if (others.nonEmpty) {
-          val newAppInfo = new ApplicationInfoWrapper(app.info, others)
-          listing.write(newAppInfo)
-        } else {
-          listing.delete(classOf[ApplicationInfoWrapper], appId)
+        val maybeUI = synchronized {
+          activeUIs.remove(appId -> attemptId)
+        }
+        maybeUI.foreach { ui =>
+          ui.invalidate()
+          ui.ui.store.close()
         }
+        diskManager.foreach(_.release(appId, attemptId, delete = true))
       }
     } catch {
       case _: NoSuchElementException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to