This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8811e8caaa8 [SPARK-38931][SS] Create root dfs directory for 
RocksDBFileManager with unknown number of keys on 1st checkpoint
8811e8caaa8 is described below

commit 8811e8caaa8540d1fa05fb77152043addc607b82
Author: Yun Tang <myas...@live.com>
AuthorDate: Tue Apr 19 20:31:04 2022 +0900

    [SPARK-38931][SS] Create root dfs directory for RocksDBFileManager with 
unknown number of keys on 1st checkpoint
    
    ### What changes were proposed in this pull request?
    Create root dfs directory for RocksDBFileManager with unknown number of 
keys on 1st checkpoint.
    
    ### Why are the changes needed?
    If this fix is not introduced, we might meet exception below:
    ~~~java
    File 
/private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1
 does not exist
    java.io.FileNotFoundException: File 
/private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1
 does not exist
            at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
            at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
            at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
            at 
org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128)
            at 
org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:93)
            at 
org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
            at 
org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
            at 
org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
            at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
            at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
            at 
org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
            at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
            at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
            at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
            at 
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
            at 
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:438)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:174)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.saveCheckpointFiles(RocksDBSuite.scala:566)
            at 
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$35(RocksDBSuite.scala:179)
            ........
    ~~~
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Tested via RocksDBSuite.
    
    Closes #36242 from Myasuka/SPARK-38931.
    
    Authored-by: Yun Tang <myas...@live.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit abb1df9d190e35a17b693f2b013b092af4f2528a)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/RocksDBFileManager.scala          |  4 +++-
 .../sql/execution/streaming/state/RocksDBSuite.scala  | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 4f2ce9b1237..26084747c32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -161,11 +161,13 @@ class RocksDBFileManager(
     metadata.writeToFile(metadataFile)
     logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
 
-    if (version <= 1 && numKeys == 0) {
+    if (version <= 1 && numKeys <= 0) {
       // If we're writing the initial version and there's no data, we have to 
explicitly initialize
       // the root directory. Normally saveImmutableFilesToDfs will do this 
initialization, but
       // when there's no data that method won't write any files, and 
zipToDfsFile uses the
       // CheckpointFileManager.createAtomic API which doesn't auto-initialize 
parent directories.
+      // Moreover, once we disable to track the number of keys, in which the 
numKeys is -1, we
+      // still need to create the initial dfs root directory anyway.
       val path = new Path(dfsRootDir)
       if (!fm.exists(path)) fm.mkdirs(path)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 91cd91b639a..75717d27687 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -170,6 +170,25 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  test("RocksDBFileManager: create init dfs directory with unknown number of 
keys") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    try {
+      val verificationDir = Utils.createTempDir().getAbsolutePath
+      val fileManager = new RocksDBFileManager(
+        dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+      // Save a version of empty checkpoint files
+      val cpFiles = Seq()
+      generateFiles(verificationDir, cpFiles)
+      assert(!dfsRootDir.exists())
+      saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = -1)
+      // The dfs root dir is created even with unknown number of keys
+      assert(dfsRootDir.exists())
+      loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, 
Nil, -1)
+    } finally {
+      Utils.deleteRecursively(dfsRootDir)
+    }
+  }
+
   test("RocksDBFileManager: upload only new immutable files") {
     withTempDir { dir =>
       val dfsRootDir = dir.getAbsolutePath


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to