Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19770#discussion_r154817305
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
    @@ -658,6 +659,103 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
         freshUI.get.ui.store.job(0)
       }
     
    +  /**
    +   * Validate aggressive clean up removes incomplete or corrupt history 
files that would
    +   * otherwise be missed during clean up.  Also validate no behavior 
change if aggressive
    +   * clean up is disabled.
    +   */
    +  test("SPARK-21571: aggressive clean up removes incomplete history 
files") {
    +    createCleanAndCheckIncompleteLogFiles(14, 21, true, true, true, true, 
true)
    +    createCleanAndCheckIncompleteLogFiles(14, 21, false, false, false, 
true, false)
    +    createCleanAndCheckIncompleteLogFiles(14, 7, true, false, false, 
false, false)
    +    createCleanAndCheckIncompleteLogFiles(14, 7, false, false, false, 
false, false)
    +  }
    +
    +  /**
    +   * Create four test incomplete/corrupt history files and invoke a check 
and clean cycle that
    +   * passes followed by one occurring after the max age days rentention 
window and assert the
    +   * expected number of history files remain.
    +   * @param maxAgeDays maximum retention in days, used to simulate current 
time
    +   * @param lastModifiedDaysAgo last modified date for test files relative 
to current time
    +   * @param aggressiveCleanup aggressive clean up is enabled or not
    +   * @param expectEmptyInprogressRemoved expect an empty inprogress file 
to be removed
    +   * @param expectEmptyCorruptRemoved expect an empty corrupt complete 
file to be removed
    +   * @param expectNonEmptyInprogressRemoved expect a non-empty inprogress 
file to be removed
    +   * @param expectNonEmptyCorruptRemoved expect a non-empty corrupt 
complete file to be removed
    +   */
    +  private def createCleanAndCheckIncompleteLogFiles(
    +      maxAgeDays: Long,
    +      lastModifiedDaysAgo: Long,
    +      aggressiveCleanup: Boolean,
    +      expectEmptyInprogressRemoved: Boolean,
    +      expectEmptyCorruptRemoved: Boolean,
    +      expectNonEmptyInprogressRemoved: Boolean,
    +      expectNonEmptyCorruptRemoved: Boolean) = {
    +    // Set current time as 2 * maximum retention period to allow for 
expired history files.
    +    val currentTimeMillis = MILLISECONDS.convert(maxAgeDays * 2, 
TimeUnit.DAYS)
    +    val clock = new ManualClock(currentTimeMillis)
    +
    +    val lastModifiedTime = currentTimeMillis -
    +      MILLISECONDS.convert(lastModifiedDaysAgo, TimeUnit.DAYS)
    +
    +    val provider = new FsHistoryProvider(
    +      createTestConf()
    +        .set("spark.history.fs.cleaner.aggressive", 
s"${aggressiveCleanup}")
    +        .set("spark.history.fs.cleaner.maxAge", s"${maxAgeDays}d")
    +        .set("spark.testing", "true"),
    +      clock) {
    +      override def getNewLastScanTime(): Long = clock.getTimeMillis
    +    }
    +
    +    // Create history files
    +    // 1. 0-byte size files inprogress and corrupt complete files
    +    // 2. >0 byte size files inprogress and corrupt complete files
    --- End diff --
    
    So this is what I refer to in a previous comment. You probably need another 
test here where you have an in progress, valid file that is being appended to, 
but where the time since its last mod time exceeds the cleanup period.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to