[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-12 Thread ericvandenbergfb
Github user ericvandenbergfb closed the pull request at:

https://github.com/apache/spark/pull/19770


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-07 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r155520764
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   listing.write(newApp)
 }
 
-toDelete.foreach { attempt =>
-  val logPath = new Path(logDir, attempt.logPath)
-  try {
-listing.delete(classOf[LogInfo], logPath.toString())
-  } catch {
-case _: NoSuchElementException =>
-  logDebug(s"Log info entry for $logPath not found.")
-  }
-  try {
-fs.delete(logPath, true)
-  } catch {
-case e: AccessControlException =>
-  logInfo(s"No permission to delete ${attempt.logPath}, 
ignoring.")
-case t: IOException =>
-  logError(s"IOException in cleaning ${attempt.logPath}", t)
-  }
-}
+toDelete
+  .map(attempt => new Path(logDir, attempt.logPath))
--- End diff --

Nit: We can change the input parameter of `deleteLogInfo`. It can accept 
the logPath(as string) instead of file . So that we don't need to write `new 
Path(logDir, attempt.logPath)` every time before calling the function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154817100
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 } finally {
   iterator.foreach(_.close())
 }
+
+// Clean corrupt or empty files that may have accumulated.
+if (AGGRESSIVE_CLEANUP) {
+  var untracked: Option[KVStoreIterator[LogInfo]] = None
+  try {
+untracked = Some(listing.view(classOf[LogInfo])
+  .index("lastModifiedTime")
--- End diff --

Because this is based on `FileStatus.getModificationTime`, couldn't this 
end up deleting event log files for long-running applications (i.e. those that 
run for over the configured cleanup period)? That will probably cause weird 
issues if those applications are actually running.

You refer to this issue in your tests, but the code doesn't seem to be 
testing that explicit case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154817305
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  /**
+   * Validate aggressive clean up removes incomplete or corrupt history 
files that would
+   * otherwise be missed during clean up.  Also validate no behavior 
change if aggressive
+   * clean up is disabled.
+   */
+  test("SPARK-21571: aggressive clean up removes incomplete history 
files") {
+createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, 
true)
+createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, 
true, false)
+createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, 
false, false)
+createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, 
false, false)
+  }
+
+  /**
+   * Create four test incomplete/corrupt history files and invoke a check 
and clean cycle that
+   * passes followed by one occurring after the max age days rentention 
window and assert the
+   * expected number of history files remain.
+   * @param maxAgeDays maximum retention in days, used to simulate current 
time
+   * @param lastModifiedDaysAgo last modified date for test files relative 
to current time
+   * @param aggressiveCleanup aggressive clean up is enabled or not
+   * @param expectEmptyInprogressRemoved expect an empty inprogress file 
to be removed
+   * @param expectEmptyCorruptRemoved expect an empty corrupt complete 
file to be removed
+   * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress 
file to be removed
+   * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt 
complete file to be removed
+   */
+  private def createCleanAndCheckIncompleteLogFiles(
+  maxAgeDays: Long,
+  lastModifiedDaysAgo: Long,
+  aggressiveCleanup: Boolean,
+  expectEmptyInprogressRemoved: Boolean,
+  expectEmptyCorruptRemoved: Boolean,
+  expectNonEmptyInprogressRemoved: Boolean,
+  expectNonEmptyCorruptRemoved: Boolean) = {
+// Set current time as 2 * maximum retention period to allow for 
expired history files.
+val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, 
TimeUnit.DAYS)
+val clock = new ManualClock(currentTimeMillis)
+
+val lastModifiedTime = currentTimeMillis -
+  MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS)
+
+val provider = new FsHistoryProvider(
+  createTestConf()
+.set("spark.history.fs.cleaner.aggressive", 
s"${aggressiveCleanup}")
+.set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d")
+.set("spark.testing", "true"),
+  clock) {
+  override def getNewLastScanTime(): Long = clock.getTimeMillis
+}
+
+// Create history files
+// 1. 0-byte size files inprogress and corrupt complete files
+// 2. >0 byte size files inprogress and corrupt complete files
--- End diff --

So this is what I refer to in a previous comment. You probably need another 
test here where you have an in progress, valid file that is being appended to, 
but where the time since its last mod time exceeds the cleanup period.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154816346
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  /**
+   * Validate aggressive clean up removes incomplete or corrupt history 
files that would
+   * otherwise be missed during clean up.  Also validate no behavior 
change if aggressive
+   * clean up is disabled.
+   */
+  test("SPARK-21571: aggressive clean up removes incomplete history 
files") {
+createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, 
true)
+createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, 
true, false)
+createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, 
false, false)
+createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, 
false, false)
+  }
+
+  /**
+   * Create four test incomplete/corrupt history files and invoke a check 
and clean cycle that
+   * passes followed by one occurring after the max age days rentention 
window and assert the
+   * expected number of history files remain.
+   * @param maxAgeDays maximum retention in days, used to simulate current 
time
+   * @param lastModifiedDaysAgo last modified date for test files relative 
to current time
+   * @param aggressiveCleanup aggressive clean up is enabled or not
+   * @param expectEmptyInprogressRemoved expect an empty inprogress file 
to be removed
+   * @param expectEmptyCorruptRemoved expect an empty corrupt complete 
file to be removed
+   * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress 
file to be removed
+   * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt 
complete file to be removed
+   */
+  private def createCleanAndCheckIncompleteLogFiles(
+  maxAgeDays: Long,
+  lastModifiedDaysAgo: Long,
+  aggressiveCleanup: Boolean,
+  expectEmptyInprogressRemoved: Boolean,
+  expectEmptyCorruptRemoved: Boolean,
+  expectNonEmptyInprogressRemoved: Boolean,
+  expectNonEmptyCorruptRemoved: Boolean) = {
+// Set current time as 2 * maximum retention period to allow for 
expired history files.
+val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, 
TimeUnit.DAYS)
+val clock = new ManualClock(currentTimeMillis)
+
+val lastModifiedTime = currentTimeMillis -
+  MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS)
+
+val provider = new FsHistoryProvider(
+  createTestConf()
+.set("spark.history.fs.cleaner.aggressive", 
s"${aggressiveCleanup}")
+.set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d")
+.set("spark.testing", "true"),
+  clock) {
+  override def getNewLastScanTime(): Long = clock.getTimeMillis
+}
+
+// Create history files
+// 1. 0-byte size files inprogress and corrupt complete files
+// 2. >0 byte size files inprogress and corrupt complete files
+
+try {
+  val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress 
= true)
+  logfile1.createNewFile
+  logfile1.setLastModified(lastModifiedTime)
+
+  val logfile2 = newLogFile("emptyCorruptLogFile", None, inProgress = 
false)
+  logfile2.createNewFile
+  logfile2.setLastModified(lastModifiedTime)
+
+  // Create an inprogress log file, has only start record.
+  val logfile3 = newLogFile("nonEmptyInprogressLogFile", None, 
inProgress = true)
+  writeFile(logfile3, true, None, SparkListenerApplicationStart(
+"inProgress1", Some("inProgress1"), 3L, "test", Some("attempt1"))
+  )
+  logfile3.setLastModified(lastModifiedTime)
+
+  // Create an incomplete log file, has an end record but no start 
record.
+  val logfile4 = newLogFile("nonEmptyCorruptLogFile", None, inProgress 
= false)
+  writeFile(logfile4, true, None, SparkListenerApplicationEnd(0))
+  logfile4.setLastModified(lastModifiedTime)
+
+  // Simulate checking logs 1 day after initial creation.  This is 
necessary because the log
+  // checker will sometimes use the current time in place of last 
modified time the first
+  // time it encounters an inprogress file to work around certain file 
system inconsistencies.
+  // No history files should clean up in first check and clean pass.
+  clock.setTime(lastModifiedTime + MILLISECONDS.convert(1, 
TimeUnit.DAYS))
+  provider.checkForLogs
--- End diff --

Add `()` to method calls (also in other places).


---


[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154814962
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 } finally {
   iterator.foreach(_.close())
 }
+
+// Clean corrupt or empty files that may have accumulated.
+if (AGGRESSIVE_CLEANUP) {
+  var untracked: Option[KVStoreIterator[LogInfo]] = None
+  try {
+untracked = Some(listing.view(classOf[LogInfo])
--- End diff --

So I spent some time reading my own patch and it's covering a slightly 
different case. My patch covers deleting SHS state when files are deleted, this 
covers deleting files that the SHS decides are broken. I still think that some 
code / state can be saved by handling both similarly - still playing with my 
code, though.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154787608
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -643,6 +633,44 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 } finally {
   iterator.foreach(_.close())
 }
+
+// Clean corrupt or empty files that may have accumulated.
+if (AGGRESSIVE_CLEANUP) {
+  var untracked: Option[KVStoreIterator[LogInfo]] = None
+  try {
+untracked = Some(listing.view(classOf[LogInfo])
--- End diff --

This logic seems to be similar to what I have in the pipeline for the new 
SHS project at https://github.com/vanzin/spark/pull/40. Except my change takes 
care of other things (like also cleaning up any loaded UI data).

Could you take a look at that PR and see whether there's something it's not 
covering? I can incorporate any needed changes there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154784102
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   listing.write(newApp)
 }
 
-toDelete.foreach { attempt =>
-  val logPath = new Path(logDir, attempt.logPath)
-  try {
-listing.delete(classOf[LogInfo], logPath.toString())
-  } catch {
-case _: NoSuchElementException =>
-  logDebug(s"Log info entry for $logPath not found.")
-  }
-  try {
-fs.delete(logPath, true)
-  } catch {
-case e: AccessControlException =>
-  logInfo(s"No permission to delete ${attempt.logPath}, 
ignoring.")
-case t: IOException =>
-  logError(s"IOException in cleaning ${attempt.logPath}", t)
-  }
-}
+toDelete
+  .map(attempt => new Path(logDir, attempt.logPath))
--- End diff --

`.map { attempt =>`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19770#discussion_r154784139
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -616,23 +620,9 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   listing.write(newApp)
 }
 
-toDelete.foreach { attempt =>
-  val logPath = new Path(logDir, attempt.logPath)
-  try {
-listing.delete(classOf[LogInfo], logPath.toString())
-  } catch {
-case _: NoSuchElementException =>
-  logDebug(s"Log info entry for $logPath not found.")
-  }
-  try {
-fs.delete(logPath, true)
-  } catch {
-case e: AccessControlException =>
-  logInfo(s"No permission to delete ${attempt.logPath}, 
ignoring.")
-case t: IOException =>
-  logError(s"IOException in cleaning ${attempt.logPath}", t)
-  }
-}
+toDelete
+  .map(attempt => new Path(logDir, attempt.logPath))
+  .foreach(logPath => deleteLogInfo(logPath))
--- End diff --

`.foreach { logPath =>` (or `.foreach(deleteLogInfo)`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19770: [SPARK-21571][WEB UI] Spark history server leaves...

2017-11-16 Thread ericvandenbergfb
GitHub user ericvandenbergfb opened a pull request:

https://github.com/apache/spark/pull/19770

[SPARK-21571][WEB UI] Spark history server leaves incomplete or unreadable 
logs around forever

## What changes were proposed in this pull request?

** Updated pull request based on some other refactoring that went into 
FsHistoryProvider **

Fix logic

checkForLogs excluded 0-size files so they stuck around forever.
checkForLogs / mergeApplicationListing indefinitely ignored files
that were not parseable/couldn't extract an appID, so they stuck around
forever.
Only apply above logic if spark.history.fs.cleaner.aggressive=true.

Fixed race condition in a test (SPARK-3697: ignore files that cannot be
read.) where the number of mergeApplicationListings could be more than 1
since the FsHistoryProvider would spin up an executor that also calls
checkForLogs in parallel with the test.

Added unit test to cover all cases with aggressive and non-aggressive
clean up logic.

## How was this patch tested?

Add test that extensive tests the untracked files getting cleaned up when 
configured.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ericvandenbergfb/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19770


commit c52b1cfd2eee9c881267d3d4cd9ea83fb6a767eb
Author: Eric Vandenberg 
Date:   2017-07-31T22:02:54Z

[SPARK-21571][WEB UI] Spark history server leaves incomplete or unreadable 
history files
around forever.

Fix logic
1. checkForLogs excluded 0-size files so they stuck around forever.
2. checkForLogs / mergeApplicationListing indefinitely ignored files
that were not parseable/couldn't extract an appID, so they stuck around
forever.

Only apply above logic if spark.history.fs.cleaner.aggressive=true.

Fixed race condition in a test (SPARK-3697: ignore files that cannot be
read.) where the number of mergeApplicationListings could be more than 1
since the FsHistoryProvider would spin up an executor that also calls
checkForLogs in parallel with the test.

Added unit test to cover all cases with aggressive and non-aggressive
clean up logic.

commit 08ea4ace02b7f8bf39190d5af53e7ced5e2807a0
Author: Eric Vandenberg 
Date:   2017-11-15T20:03:21Z

Merge branch 'master' of github.com:ericvandenbergfb/spark into 
cleanup.untracked.history.files

* 'master' of github.com:ericvandenbergfb/spark: (637 commits)
  [SPARK-22469][SQL] Accuracy problem in comparison with string and numeric
  [SPARK-22490][DOC] Add PySpark doc for SparkSession.builder
  [SPARK-22422][ML] Add Adjusted R2 to RegressionMetrics
  [SPARK-20791][PYTHON][FOLLOWUP] Check for unicode column names in 
createDataFrame with Arrow
  [SPARK-22514][SQL] move ColumnVector.Array and ColumnarBatch.Row to 
individual files
  [SPARK-12375][ML] VectorIndexerModel support handle unseen categories via 
handleInvalid
  [SPARK-21087][ML] CrossValidator, TrainValidationSplit expose sub models 
after fitting: Scala
  [SPARK-22511][BUILD] Update maven central repo address
  [SPARK-22519][YARN] Remove unnecessary stagingDirPath null check in 
ApplicationMaster.cleanupStagingDir()
  [SPARK-20652][SQL] Store SQL UI data in the new app status store.
  [SPARK-20648][CORE] Port JobsTab and StageTab to the new UI backend.
  [SPARK-17074][SQL] Generate equi-height histogram in column statistics
  [SPARK-17310][SQL] Add an option to disable record-level filter in 
Parquet-side
  [SPARK-21911][ML][FOLLOW-UP] Fix doc for parallel ML Tuning in PySpark
  [SPARK-22377][BUILD] Use /usr/sbin/lsof if lsof does not exists in 
release-build.sh
  [SPARK-22487][SQL][FOLLOWUP] still keep spark.sql.hive.version
  [MINOR][CORE] Using bufferedInputStream for dataDeserializeStream
  [SPARK-20791][PYSPARK] Use Arrow to create Spark DataFrame from Pandas
  [SPARK-21693][R][ML] Reduce max iterations in Linear SVM test in R to 
speed up AppVeyor build
  [SPARK-21720][SQL] Fix 64KB JVM bytecode limit problem with AND or OR
  ...

commit aee2fd3ffb9d720d33d032fdb924e9d1f4d20a4c
Author: Eric Vandenberg 
Date:   2017-11-16T20:33:39Z

[SPARK-21571][WEB UI] Spark history server cleans up untracked files.

The history provider code was changed so I reimplemented the fix to
clean up empty or corrupt history files that otherwise would stay
around forever.

commit 3431d5a2c427f1d2f2a859e014ed62c30a45ebdb
Author: ericvandenbergfb 
Date:   2017-11-16T2