This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd0a04b  [SPARK-26949][SS] Prevent 'purge' to remove needed batch 
files in CompactibleFileStreamLog
bd0a04b is described below

commit bd0a04baab0fe30da8621e35eb69bbdcf84acbf1
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Fri Jun 14 20:34:18 2019 -0700

    [SPARK-26949][SS] Prevent 'purge' to remove needed batch files in 
CompactibleFileStreamLog
    
    ## What changes were proposed in this pull request?
    
    This patch proposes making `purge` in `CompactibleFileStreamLog` to throw 
`UnsupportedOperationException` to prevent purging necessary batch files, as 
well as adding javadoc to document its behavior. Actually it would only break 
when latest compaction batch is requested to be purged, but caller wouldn't be 
aware of this so safer to just prevent it.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #23850 from HeartSaVioR/SPARK-26949.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../streaming/CompactibleFileStreamLog.scala       | 10 ++++++++++
 .../streaming/CompactibleFileStreamLogSuite.scala  | 23 ++++++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 19c93c5..96b9e06 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -163,6 +163,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
   }
 
   /**
+   * CompactibleFileStreamLog maintains logs by itself, and manual purging 
might break internal
+   * state, specifically which latest compaction batch is purged.
+   *
+   * To simplify the situation, this method just throws 
UnsupportedOperationException regardless
+   * of given parameter, and let CompactibleFileStreamLog handles purging by 
itself.
+   */
+  override def purge(thresholdBatchId: Long): Unit = throw new 
UnsupportedOperationException(
+    s"Cannot purge as it might break internal state.")
+
+  /**
    * Compacts all logs before `batchId` plus the provided `logs`, and writes 
them into the
    * corresponding `batchId` file. It will delete expired files as well if 
enabled.
    */
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index ec961a9..71dc377 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -232,6 +232,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite 
with SharedSQLContext
       })
   }
 
+  test("prevent removing metadata files via method purge") {
+    withFakeCompactibleFileStreamLog(
+      fileCleanupDelayMs = 10000,
+      defaultCompactInterval = 2,
+      defaultMinBatchesToRetain = 3,
+      compactibleLog => {
+        // compaction batches: 1
+        compactibleLog.add(0, Array("some_path_0"))
+        compactibleLog.add(1, Array("some_path_1"))
+        compactibleLog.add(2, Array("some_path_2"))
+
+        val exc = intercept[UnsupportedOperationException] {
+          compactibleLog.purge(2)
+        }
+        assert(exc.getMessage.contains("Cannot purge as it might break 
internal state"))
+
+        // Below line would fail with IllegalStateException if we don't 
prevent purge:
+        // - purge(2) would delete batch 0 and 1 which batch 1 is compaction 
batch
+        // - allFiles() would read batch 1 (latest compaction) and 2 which 
batch 1 is deleted
+        compactibleLog.allFiles()
+      })
+  }
+
   private def withFakeCompactibleFileStreamLog(
     fileCleanupDelayMs: Long,
     defaultCompactInterval: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to