[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user ericvandenbergfb closed the pull request at: https://github.com/apache/spark/pull/19770 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r155520764 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newApp) } -toDelete.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - try { -listing.delete(classOf[LogInfo], logPath.toString()) - } catch { -case _: NoSuchElementException => - logDebug(s"Log info entry for $logPath not found.") - } - try { -fs.delete(logPath, true) - } catch { -case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") -case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - } -} +toDelete + .map(attempt => new Path(logDir, attempt.logPath)) --- End diff -- Nit: We can change the input parameter of `deleteLogInfo`. It can accept the logPath(as string) instead of file . So that we don't need to write `new Path(logDir, attempt.logPath)` every time before calling the function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154817100 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { iterator.foreach(_.close()) } + +// Clean corrupt or empty files that may have accumulated. +if (AGGRESSIVE_CLEANUP) { + var untracked: Option[KVStoreIterator[LogInfo]] = None + try { +untracked = Some(listing.view(classOf[LogInfo]) + .index("lastModifiedTime") --- End diff -- Because this is based on `FileStatus.getModificationTime`, couldn't this end up deleting event log files for long-running applications (i.e. those that run for over the configured cleanup period)? That will probably cause weird issues if those applications are actually running. You refer to this issue in your tests, but the code doesn't seem to be testing that explicit case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154817305 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc freshUI.get.ui.store.job(0) } + /** + * Validate aggressive clean up removes incomplete or corrupt history files that would + * otherwise be missed during clean up. Also validate no behavior change if aggressive + * clean up is disabled. + */ + test("SPARK-21571: aggressive clean up removes incomplete history files") { +createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, true) +createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, true, false) +createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, false, false) +createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, false, false) + } + + /** + * Create four test incomplete/corrupt history files and invoke a check and clean cycle that + * passes followed by one occurring after the max age days rentention window and assert the + * expected number of history files remain. + * @param maxAgeDays maximum retention in days, used to simulate current time + * @param lastModifiedDaysAgo last modified date for test files relative to current time + * @param aggressiveCleanup aggressive clean up is enabled or not + * @param expectEmptyInprogressRemoved expect an empty inprogress file to be removed + * @param expectEmptyCorruptRemoved expect an empty corrupt complete file to be removed + * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress file to be removed + * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt complete file to be removed + */ + private def createCleanAndCheckIncompleteLogFiles( + maxAgeDays: Long, + lastModifiedDaysAgo: Long, + aggressiveCleanup: Boolean, + expectEmptyInprogressRemoved: Boolean, + expectEmptyCorruptRemoved: Boolean, + expectNonEmptyInprogressRemoved: Boolean, + expectNonEmptyCorruptRemoved: Boolean) = { +// Set current time as 2 * maximum retention period to allow for expired history files. +val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, TimeUnit.DAYS) +val clock = new ManualClock(currentTimeMillis) + +val lastModifiedTime = currentTimeMillis - + MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS) + +val provider = new FsHistoryProvider( + createTestConf() +.set("spark.history.fs.cleaner.aggressive", s"${aggressiveCleanup}") +.set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d") +.set("spark.testing", "true"), + clock) { + override def getNewLastScanTime(): Long = clock.getTimeMillis +} + +// Create history files +// 1. 0-byte size files inprogress and corrupt complete files +// 2. >0 byte size files inprogress and corrupt complete files --- End diff -- So this is what I refer to in a previous comment. You probably need another test here where you have an in progress, valid file that is being appended to, but where the time since its last mod time exceeds the cleanup period. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154816346 --- Diff: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala --- @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc freshUI.get.ui.store.job(0) } + /** + * Validate aggressive clean up removes incomplete or corrupt history files that would + * otherwise be missed during clean up. Also validate no behavior change if aggressive + * clean up is disabled. + */ + test("SPARK-21571: aggressive clean up removes incomplete history files") { +createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, true) +createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, true, false) +createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, false, false) +createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, false, false) + } + + /** + * Create four test incomplete/corrupt history files and invoke a check and clean cycle that + * passes followed by one occurring after the max age days rentention window and assert the + * expected number of history files remain. + * @param maxAgeDays maximum retention in days, used to simulate current time + * @param lastModifiedDaysAgo last modified date for test files relative to current time + * @param aggressiveCleanup aggressive clean up is enabled or not + * @param expectEmptyInprogressRemoved expect an empty inprogress file to be removed + * @param expectEmptyCorruptRemoved expect an empty corrupt complete file to be removed + * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress file to be removed + * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt complete file to be removed + */ + private def createCleanAndCheckIncompleteLogFiles( + maxAgeDays: Long, + lastModifiedDaysAgo: Long, + aggressiveCleanup: Boolean, + expectEmptyInprogressRemoved: Boolean, + expectEmptyCorruptRemoved: Boolean, + expectNonEmptyInprogressRemoved: Boolean, + expectNonEmptyCorruptRemoved: Boolean) = { +// Set current time as 2 * maximum retention period to allow for expired history files. +val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, TimeUnit.DAYS) +val clock = new ManualClock(currentTimeMillis) + +val lastModifiedTime = currentTimeMillis - + MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS) + +val provider = new FsHistoryProvider( + createTestConf() +.set("spark.history.fs.cleaner.aggressive", s"${aggressiveCleanup}") +.set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d") +.set("spark.testing", "true"), + clock) { + override def getNewLastScanTime(): Long = clock.getTimeMillis +} + +// Create history files +// 1. 0-byte size files inprogress and corrupt complete files +// 2. >0 byte size files inprogress and corrupt complete files + +try { + val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = true) + logfile1.createNewFile + logfile1.setLastModified(lastModifiedTime) + + val logfile2 = newLogFile("emptyCorruptLogFile", None, inProgress = false) + logfile2.createNewFile + logfile2.setLastModified(lastModifiedTime) + + // Create an inprogress log file, has only start record. + val logfile3 = newLogFile("nonEmptyInprogressLogFile", None, inProgress = true) + writeFile(logfile3, true, None, SparkListenerApplicationStart( +"inProgress1", Some("inProgress1"), 3L, "test", Some("attempt1")) + ) + logfile3.setLastModified(lastModifiedTime) + + // Create an incomplete log file, has an end record but no start record. + val logfile4 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false) + writeFile(logfile4, true, None, SparkListenerApplicationEnd(0)) + logfile4.setLastModified(lastModifiedTime) + + // Simulate checking logs 1 day after initial creation. This is necessary because the log + // checker will sometimes use the current time in place of last modified time the first + // time it encounters an inprogress file to work around certain file system inconsistencies. + // No history files should clean up in first check and clean pass. + clock.setTime(lastModifiedTime + MILLISECONDS.convert(1, TimeUnit.DAYS)) + provider.checkForLogs --- End diff -- Add `()` to method calls (also in other places). ---
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154814962 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { iterator.foreach(_.close()) } + +// Clean corrupt or empty files that may have accumulated. +if (AGGRESSIVE_CLEANUP) { + var untracked: Option[KVStoreIterator[LogInfo]] = None + try { +untracked = Some(listing.view(classOf[LogInfo]) --- End diff -- So I spent some time reading my own patch and it's covering a slightly different case. My patch covers deleting SHS state when files are deleted, this covers deleting files that the SHS decides are broken. I still think that some code / state can be saved by handling both similarly - still playing with my code, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154787608 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { iterator.foreach(_.close()) } + +// Clean corrupt or empty files that may have accumulated. +if (AGGRESSIVE_CLEANUP) { + var untracked: Option[KVStoreIterator[LogInfo]] = None + try { +untracked = Some(listing.view(classOf[LogInfo]) --- End diff -- This logic seems to be similar to what I have in the pipeline for the new SHS project at https://github.com/vanzin/spark/pull/40. Except my change takes care of other things (like also cleaning up any loaded UI data). Could you take a look at that PR and see whether there's something it's not covering? I can incorporate any needed changes there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154784102 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newApp) } -toDelete.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - try { -listing.delete(classOf[LogInfo], logPath.toString()) - } catch { -case _: NoSuchElementException => - logDebug(s"Log info entry for $logPath not found.") - } - try { -fs.delete(logPath, true) - } catch { -case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") -case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - } -} +toDelete + .map(attempt => new Path(logDir, attempt.logPath)) --- End diff -- `.map { attempt =>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19770#discussion_r154784139 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newApp) } -toDelete.foreach { attempt => - val logPath = new Path(logDir, attempt.logPath) - try { -listing.delete(classOf[LogInfo], logPath.toString()) - } catch { -case _: NoSuchElementException => - logDebug(s"Log info entry for $logPath not found.") - } - try { -fs.delete(logPath, true) - } catch { -case e: AccessControlException => - logInfo(s"No permission to delete ${attempt.logPath}, ignoring.") -case t: IOException => - logError(s"IOException in cleaning ${attempt.logPath}", t) - } -} +toDelete + .map(attempt => new Path(logDir, attempt.logPath)) + .foreach(logPath => deleteLogInfo(logPath)) --- End diff -- `.foreach { logPath =>` (or `.foreach(deleteLogInfo)`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...
GitHub user ericvandenbergfb opened a pull request: https://github.com/apache/spark/pull/19770 [SPARK-21571][WEB UI] Spark history server leaves incomplete or unreadable logs around forever ## What changes were proposed in this pull request? ** Updated pull request based on some other refactoring that went into FsHistoryProvider ** Fix logic checkForLogs excluded 0-size files so they stuck around forever. checkForLogs / mergeApplicationListing indefinitely ignored files that were not parseable/couldn't extract an appID, so they stuck around forever. Only apply above logic if spark.history.fs.cleaner.aggressive=true. Fixed race condition in a test (SPARK-3697: ignore files that cannot be read.) where the number of mergeApplicationListings could be more than 1 since the FsHistoryProvider would spin up an executor that also calls checkForLogs in parallel with the test. Added unit test to cover all cases with aggressive and non-aggressive clean up logic. ## How was this patch tested? Add test that extensive tests the untracked files getting cleaned up when configured. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ericvandenbergfb/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19770 commit c52b1cfd2eee9c881267d3d4cd9ea83fb6a767eb Author: Eric Vandenberg Date: 2017-07-31T22:02:54Z [SPARK-21571][WEB UI] Spark history server leaves incomplete or unreadable history files around forever. Fix logic 1. checkForLogs excluded 0-size files so they stuck around forever. 2. checkForLogs / mergeApplicationListing indefinitely ignored files that were not parseable/couldn't extract an appID, so they stuck around forever. Only apply above logic if spark.history.fs.cleaner.aggressive=true. Fixed race condition in a test (SPARK-3697: ignore files that cannot be read.) where the number of mergeApplicationListings could be more than 1 since the FsHistoryProvider would spin up an executor that also calls checkForLogs in parallel with the test. Added unit test to cover all cases with aggressive and non-aggressive clean up logic. commit 08ea4ace02b7f8bf39190d5af53e7ced5e2807a0 Author: Eric Vandenberg Date: 2017-11-15T20:03:21Z Merge branch 'master' of github.com:ericvandenbergfb/spark into cleanup.untracked.history.files * 'master' of github.com:ericvandenbergfb/spark: (637 commits) [SPARK-22469][SQL] Accuracy problem in comparison with string and numeric [SPARK-22490][DOC] Add PySpark doc for SparkSession.builder [SPARK-22422][ML] Add Adjusted R2 to RegressionMetrics [SPARK-20791][PYTHON][FOLLOWUP] Check for unicode column names in createDataFrame with Arrow [SPARK-22514][SQL] move ColumnVector.Array and ColumnarBatch.Row to individual files [SPARK-12375][ML] VectorIndexerModel support handle unseen categories via handleInvalid [SPARK-21087][ML] CrossValidator, TrainValidationSplit expose sub models after fitting: Scala [SPARK-22511][BUILD] Update maven central repo address [SPARK-22519][YARN] Remove unnecessary stagingDirPath null check in ApplicationMaster.cleanupStagingDir() [SPARK-20652][SQL] Store SQL UI data in the new app status store. [SPARK-20648][CORE] Port JobsTab and StageTab to the new UI backend. [SPARK-17074][SQL] Generate equi-height histogram in column statistics [SPARK-17310][SQL] Add an option to disable record-level filter in Parquet-side [SPARK-21911][ML][FOLLOW-UP] Fix doc for parallel ML Tuning in PySpark [SPARK-22377][BUILD] Use /usr/sbin/lsof if lsof does not exists in release-build.sh [SPARK-22487][SQL][FOLLOWUP] still keep spark.sql.hive.version [MINOR][CORE] Using bufferedInputStream for dataDeserializeStream [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas [SPARK-21693][R][ML] Reduce max iterations in Linear SVM test in R to speed up AppVeyor build [SPARK-21720][SQL] Fix 64KB JVM bytecode limit problem with AND or OR ... commit aee2fd3ffb9d720d33d032fdb924e9d1f4d20a4c Author: Eric Vandenberg Date: 2017-11-16T20:33:39Z [SPARK-21571][WEB UI] Spark history server cleans up untracked files. The history provider code was changed so I reimplemented the fix to clean up empty or corrupt history files that otherwise would stay around forever. commit 3431d5a2c427f1d2f2a859e014ed62c30a45ebdb Author: ericvandenbergfb Date: 2017-11-16T2