This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aa877e2819 [SPARK-40826][SS] Add additional checkpoint rename file 
check
0aa877e2819 is described below

commit 0aa877e2819d72204045876d7e916627d9b7f679
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Tue Oct 18 22:05:53 2022 -0700

    [SPARK-40826][SS] Add additional checkpoint rename file check
    
    ### What changes were proposed in this pull request?
    
    This adds additional checkpoint rename file check.
    
    ### Why are the changes needed?
    
    We encountered an issue recently that one customer's structured streaming 
job failed to read delta file.
    
    The temporary file exists but it was not successfully renamed to final 
delta file path.
    
    We currently don't check if renamed file exists but assume it successful. 
As the result, failing to read delta file assumed to be committed in last batch 
makes re-triggering the job impossible.
    
    We should be able to do a check against checkpoint renamed file to prevent 
such difficulty in advance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #38291 from viirya/add_file_check.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++
 .../spark/sql/execution/streaming/CheckpointFileManager.scala  |  7 +++++++
 2 files changed, 17 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2f96209222b..a99a795018d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1830,6 +1830,14 @@ object SQLConf {
       .stringConf
       .createWithDefault("lz4")
 
+  val CHECKPOINT_RENAMEDFILE_CHECK_ENABLED =
+    buildConf("spark.sql.streaming.checkpoint.renamedFileCheck.enabled")
+      .doc("When true, Spark will validate if renamed checkpoint file exists.")
+      .internal()
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   /**
    * Note: this is defined in `RocksDBConf.FORMAT_VERSION`. These two places 
should be updated
    * together.
@@ -4234,6 +4242,8 @@ class SQLConf extends Serializable with Logging {
 
   def stateStoreCompressionCodec: String = 
getConf(STATE_STORE_COMPRESSION_CODEC)
 
+  def checkpointRenamedFileCheck: Boolean = 
getConf(CHECKPOINT_RENAMEDFILE_CHECK_ENABLED)
+
   def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
   def parquetFilterPushDownDate: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index cf5d54fd20a..013efd3c7ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -158,6 +158,13 @@ object CheckpointFileManager extends Logging {
               s"Failed to rename temp file $tempPath to $finalPath because 
file exists", fe)
             if (!overwriteIfPossible) throw fe
         }
+
+        // Optionally, check if the renamed file exists
+        if (SQLConf.get.checkpointRenamedFileCheck && !fm.exists(finalPath)) {
+          throw new IllegalStateException(s"Renamed temp file $tempPath to 
$finalPath. " +
+            s"But $finalPath does not exist.")
+        }
+
         logInfo(s"Renamed temp file $tempPath to $finalPath")
       } finally {
         terminated = true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to