This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bd0a04b [SPARK-26949][SS] Prevent 'purge' to remove needed batch files in CompactibleFileStreamLog bd0a04b is described below commit bd0a04baab0fe30da8621e35eb69bbdcf84acbf1 Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Fri Jun 14 20:34:18 2019 -0700 [SPARK-26949][SS] Prevent 'purge' to remove needed batch files in CompactibleFileStreamLog ## What changes were proposed in this pull request? This patch proposes making `purge` in `CompactibleFileStreamLog` to throw `UnsupportedOperationException` to prevent purging necessary batch files, as well as adding javadoc to document its behavior. Actually it would only break when latest compaction batch is requested to be purged, but caller wouldn't be aware of this so safer to just prevent it. ## How was this patch tested? Added UT. Closes #23850 from HeartSaVioR/SPARK-26949. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../streaming/CompactibleFileStreamLog.scala | 10 ++++++++++ .../streaming/CompactibleFileStreamLogSuite.scala | 23 ++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 19c93c5..96b9e06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -163,6 +163,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( } /** + * CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal + * state, specifically which latest compaction batch is purged. + * + * To simplify the situation, this method just throws UnsupportedOperationException regardless + * of given parameter, and let CompactibleFileStreamLog handles purging by itself. + */ + override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException( + s"Cannot purge as it might break internal state.") + + /** * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the * corresponding `batchId` file. It will delete expired files as well if enabled. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala index ec961a9..71dc377 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala @@ -232,6 +232,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext }) } + test("prevent removing metadata files via method purge") { + withFakeCompactibleFileStreamLog( + fileCleanupDelayMs = 10000, + defaultCompactInterval = 2, + defaultMinBatchesToRetain = 3, + compactibleLog => { + // compaction batches: 1 + compactibleLog.add(0, Array("some_path_0")) + compactibleLog.add(1, Array("some_path_1")) + compactibleLog.add(2, Array("some_path_2")) + + val exc = intercept[UnsupportedOperationException] { + compactibleLog.purge(2) + } + assert(exc.getMessage.contains("Cannot purge as it might break internal state")) + + // Below line would fail with IllegalStateException if we don't prevent purge: + // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch + // - allFiles() would read batch 1 (latest compaction) and 2 which batch 1 is deleted + compactibleLog.allFiles() + }) + } + private def withFakeCompactibleFileStreamLog( fileCleanupDelayMs: Long, defaultCompactInterval: Int, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org