This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 8811e8caaa8 [SPARK-38931][SS] Create root dfs directory for RocksDBFileManager with unknown number of keys on 1st checkpoint 8811e8caaa8 is described below commit 8811e8caaa8540d1fa05fb77152043addc607b82 Author: Yun Tang <myas...@live.com> AuthorDate: Tue Apr 19 20:31:04 2022 +0900 [SPARK-38931][SS] Create root dfs directory for RocksDBFileManager with unknown number of keys on 1st checkpoint ### What changes were proposed in this pull request? Create root dfs directory for RocksDBFileManager with unknown number of keys on 1st checkpoint. ### Why are the changes needed? If this fix is not introduced, we might meet exception below: ~~~java File /private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1 does not exist java.io.FileNotFoundException: File /private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769) at org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128) at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:93) at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353) at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400) at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626) at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701) at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.create(FileContext.java:703) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333) at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:438) at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:174) at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.saveCheckpointFiles(RocksDBSuite.scala:566) at org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$35(RocksDBSuite.scala:179) ........ ~~~ ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested via RocksDBSuite. Closes #36242 from Myasuka/SPARK-38931. Authored-by: Yun Tang <myas...@live.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit abb1df9d190e35a17b693f2b013b092af4f2528a) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../streaming/state/RocksDBFileManager.scala | 4 +++- .../sql/execution/streaming/state/RocksDBSuite.scala | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 4f2ce9b1237..26084747c32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -161,11 +161,13 @@ class RocksDBFileManager( metadata.writeToFile(metadataFile) logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}") - if (version <= 1 && numKeys == 0) { + if (version <= 1 && numKeys <= 0) { // If we're writing the initial version and there's no data, we have to explicitly initialize // the root directory. Normally saveImmutableFilesToDfs will do this initialization, but // when there's no data that method won't write any files, and zipToDfsFile uses the // CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories. + // Moreover, once we disable to track the number of keys, in which the numKeys is -1, we + // still need to create the initial dfs root directory anyway. val path = new Path(dfsRootDir) if (!fm.exists(path)) fm.mkdirs(path) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 91cd91b639a..75717d27687 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -170,6 +170,25 @@ class RocksDBSuite extends SparkFunSuite { } } + test("RocksDBFileManager: create init dfs directory with unknown number of keys") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + try { + val verificationDir = Utils.createTempDir().getAbsolutePath + val fileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + // Save a version of empty checkpoint files + val cpFiles = Seq() + generateFiles(verificationDir, cpFiles) + assert(!dfsRootDir.exists()) + saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = -1) + // The dfs root dir is created even with unknown number of keys + assert(dfsRootDir.exists()) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, Nil, -1) + } finally { + Utils.deleteRecursively(dfsRootDir) + } + } + test("RocksDBFileManager: upload only new immutable files") { withTempDir { dir => val dfsRootDir = dir.getAbsolutePath --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org