This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 599adc851b2 [SPARK-39083][CORE] Fix race condition between update and clean app data 599adc851b2 is described below commit 599adc851b290c0e9d867d183fe5d030250c28f8 Author: tan.vu <tan...@linecorp.com> AuthorDate: Sun May 8 08:09:20 2022 -0500 [SPARK-39083][CORE] Fix race condition between update and clean app data ### What changes were proposed in this pull request? make `cleanAppData` atomic to prevent race condition between update and clean app data. When the race condition happens, it could lead to a scenario when `cleanAppData` delete the entry of ApplicationInfoWrapper for an application right after it has been updated by `mergeApplicationListing`. So there will be cases when the HS Web UI displays `Application not found` for applications whose logs does exist. #### Error message ``` 22/04/29 17:16:21 DEBUG FsHistoryProvider: New/updated attempts found: 1 ArrayBuffer(viewfs://iu/log/spark3/application_1651119726430_138107_1) 22/04/29 17:16:21 INFO FsHistoryProvider: Parsing viewfs://iu/log/spark3/application_1651119726430_138107_1 for listing data... 22/04/29 17:16:21 INFO FsHistoryProvider: Looking for end event; skipping 10805037 bytes from viewfs://iu/log/spark3/application_1651119726430_138107_1... 22/04/29 17:16:21 INFO FsHistoryProvider: Finished parsing viewfs://iu/log/spark3/application_1651119726430_138107_1 22/04/29 17:16:21 ERROR Utils: Uncaught exception in thread log-replay-executor-7 java.util.NoSuchElementException at org.apache.spark.util.kvstore.InMemoryStore.read(InMemoryStore.java:85) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkAndCleanLog$3(FsHistoryProvider.scala:927) at scala.Option.foreach(Option.scala:407) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkAndCleanLog$1(FsHistoryProvider.scala:926) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryLog(Utils.scala:2032) at org.apache.spark.deploy.history.FsHistoryProvider.checkAndCleanLog(FsHistoryProvider.scala:916) at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:712) at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:576) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` #### Background Currently, the HS runs the `checkForLogs` to build the application list based on the current contents of the log directory for every 10 seconds by default. - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L296-L299 - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L472 In each turn of execution, this method scans the specified logDir and parse the log files to update its KVStore: - detect new updated/added files to process : https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L574-L578 - detect stale data to remove: https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600 These 2 operations are executed in different threads as `submitLogProcessTask` uses `replayExecutor` to submit tasks. https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1389-L1401 ### When does the bug happen? `Application not found` error happens in the following scenario: In the first run of `checkForLogs`, it detected a newly-added log `viewfs://iu/log/spark3/AAA_1.inprogress` (log of an in-progress application named AAA). So it will add 2 entries to the KVStore - one entry of key-value as the key is the logPath (`viewfs://iu/log/spark3/AAA_1.inprogress`) and the value is an instance of LogInfo represented the log - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L495-L505 - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L545-L552 - one entry of key-value as the key is the applicationId (`AAA`) and the value is an instance of ApplicationInfoWrapper holding the information of the application. - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L825 - https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172 In the next run of `checkForLogs`, now the AAA application has finished, the log `viewfs://iu/log/spark3/AAA_1.inprogress` has been deleted and a new log `viewfs://iu/log/spark3/AAA_1` is created. So `checkForLogs` will do the following 2 things in 2 different threads: - Thread 1: parsing the new log `viewfs://iu/log/spark3/AAA_1` and update data in its KVStore - add a new entry of key: `viewfs://iu/log/spark3/AAA_1` and value: an instance of LogInfo represented the log - updated the entry with key=applicationId (`AAA`) with new value of an instance of ApplicationInfoWrapper (for example: the isCompleted flag now change from false to true) - Thread 2: data related to `viewfs://iu/log/spark3/AAA_1.inprogress` is now considered as stale and it must be deleted. - clean App data for `viewfs://iu/log/spark3/AAA_1.inprogress` https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L586-L600 - Inside `cleanAppData`, first it loads the latest information of `ApplicationInfoWrapper` from the KVStore: https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L632 For most of the time, when this line is executed, Thread 1 already finished `updating the entry with key=applicationId (AAA) with new value of an instance of ApplicationInfoWrapper` so this condition https://github.com/apache/spark/blob/v3.2.1/core/s [...] So here we make the `cleanAppData` method atomic just like the `addListing` method https://github.com/apache/spark/blob/v3.2.1/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L1172 so that - If Thread 1 gets the lock on the `listing` before Thread 2, it will update the entry for the application, so in Thread2 `isStale` will be false, the entry for the application will not be removed from KVStore - If Thread 2 gets the lock on the `listing` before Thread 1, then `isStale` will be true, the entry for the application will be removed from KVStore but after that it will be added again by Thread 1. In both case, the entry for the application will not be deleted unexpectedly from KVStore. ### Why are the changes needed? Fix the bug causing HS Web UI to display `Application not found` for applications whose logs does exist. ### Does this PR introduce _any_ user-facing change? Yes, bug fix. ## How was this patch tested? Manual test. Deployed in our Spark HS and the `java.util.NoSuchElementException` exception does not happen anymore. `Application not found` error does not happen anymore. Closes #36424 from tanvn/SPARK-39083. Authored-by: tan.vu <tan...@linecorp.com> Signed-off-by: Sean Owen <sro...@gmail.com> (cherry picked from commit 29643265a9f5e8142d20add5350c614a55161451) Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/deploy/history/FsHistoryProvider.scala | 51 +++++++++++----------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6e2471d4b1b..1566fda9463 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -629,37 +629,38 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = { try { - val app = load(appId) - val (attempt, others) = app.attempts.partition(_.info.attemptId == attemptId) - - assert(attempt.isEmpty || attempt.size == 1) - val isStale = attempt.headOption.exists { a => - if (a.logPath != new Path(logPath).getName()) { - // If the log file name does not match, then probably the old log file was from an - // in progress application. Just return that the app should be left alone. - false - } else { - val maybeUI = synchronized { - activeUIs.remove(appId -> attemptId) - } - - maybeUI.foreach { ui => - ui.invalidate() - ui.ui.store.close() + var isStale = false + listing.synchronized { + val app = load(appId) + val (attempt, others) = app.attempts.partition(_.info.attemptId == attemptId) + + assert(attempt.isEmpty || attempt.size == 1) + isStale = attempt.headOption.exists { a => + if (a.logPath != new Path(logPath).getName()) { + // If the log file name does not match, then probably the old log file was from an + // in progress application. Just return that the app should be left alone. + false + } else { + if (others.nonEmpty) { + val newAppInfo = new ApplicationInfoWrapper(app.info, others) + listing.write(newAppInfo) + } else { + listing.delete(classOf[ApplicationInfoWrapper], appId) + } + true } - - diskManager.foreach(_.release(appId, attemptId, delete = true)) - true } } if (isStale) { - if (others.nonEmpty) { - val newAppInfo = new ApplicationInfoWrapper(app.info, others) - listing.write(newAppInfo) - } else { - listing.delete(classOf[ApplicationInfoWrapper], appId) + val maybeUI = synchronized { + activeUIs.remove(appId -> attemptId) + } + maybeUI.foreach { ui => + ui.invalidate() + ui.ui.store.close() } + diskManager.foreach(_.release(appId, attemptId, delete = true)) } } catch { case _: NoSuchElementException => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org