This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 62f3a47947 [core] lazily create FileChannelManagerImpl to avoid 
creating unused dirs (#5591)
62f3a47947 is described below

commit 62f3a47947bd80c58e90057bef6cf50ed5a59a06
Author: Yujiang Zhong <[email protected]>
AuthorDate: Tue May 13 23:05:19 2025 +0800

    [core] lazily create FileChannelManagerImpl to avoid creating unused dirs 
(#5591)
---
 .../paimon/crosspartition/GlobalIndexAssigner.java | 22 ++++++---
 .../java/org/apache/paimon/disk/IOManagerImpl.java | 54 ++++++++++++++--------
 2 files changed, 49 insertions(+), 27 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 5ed13b4fce..fa9f0464ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -62,11 +62,11 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.IntStream;
@@ -132,14 +132,22 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
         this.extractor = new 
RowPartitionAllPrimaryKeyExtractor(table.schema());
         this.keyPartExtractor = new 
KeyPartPartitionKeyExtractor(table.schema());
 
+        for (String tmpDir : ioManager.tempDirs()) {
+            File rocksDBDir = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
+            if (rocksDBDir.mkdirs()) {
+                this.path = rocksDBDir;
+                break;
+            }
+        }
+
+        if (path == null) {
+            throw new RuntimeException(
+                    "Failed to create RocksDB cache directory in temp dirs: "
+                            + Arrays.toString(ioManager.tempDirs()));
+        }
+
         // state
         Options options = coreOptions.toConfiguration();
-        String rocksDBDir =
-                ioManager
-                        .tempDirs()[
-                        
ThreadLocalRandom.current().nextInt(ioManager.tempDirs().length)];
-        this.path = new File(rocksDBDir, "rocksdb-" + UUID.randomUUID());
-
         Options rocksdbOptions = Options.fromMap(new 
HashMap<>(options.toMap()));
         // we should avoid too small memory
         long blockCache = Math.max(offHeapMemory, 
rocksdbOptions.get(BLOCK_CACHE_SIZE).getBytes());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index 53f5090b37..a67dcfb8c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -41,7 +41,7 @@ public class IOManagerImpl implements IOManager {
 
     private final String[] tempDirs;
 
-    private final FileChannelManager fileChannelManager;
+    private volatile FileChannelManager lazyFileChannelManager;
 
     // 
-------------------------------------------------------------------------
     //               Constructors / Destructors
@@ -53,37 +53,51 @@ public class IOManagerImpl implements IOManager {
      * @param tempDirs The basic directories for files underlying anonymous 
channels.
      */
     public IOManagerImpl(String... tempDirs) {
+        Preconditions.checkNotNull(tempDirs);
         this.tempDirs = tempDirs;
-        this.fileChannelManager =
-                new 
FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
-        if (LOG.isInfoEnabled()) {
-            LOG.info(
-                    "Created a new {} for spilling of task related data to 
disk (joins, sorting, ...). Used directories:\n\t{}",
-                    FileChannelManager.class.getSimpleName(),
-                    getSpillingDirectoriesPathsString());
+    }
+
+    private FileChannelManager fileChannelManager() {
+        if (lazyFileChannelManager == null) {
+            synchronized (this) {
+                if (lazyFileChannelManager == null) {
+                    this.lazyFileChannelManager =
+                            new FileChannelManagerImpl(tempDirs, 
DIR_NAME_PREFIX);
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info(
+                                "Created a new {} for spilling of task related 
data to disk (joins, sorting, ...). Used directories:\n\t{}",
+                                FileChannelManager.class.getSimpleName(),
+                                getSpillingDirectoriesPathsString());
+                    }
+                }
+            }
         }
+
+        return lazyFileChannelManager;
     }
 
     /** Removes all temporary files. */
     @Override
     public void close() throws Exception {
-        fileChannelManager.close();
-        if (LOG.isInfoEnabled()) {
-            LOG.info(
-                    "Closed {} with directories:\n\t{}",
-                    FileChannelManager.class.getSimpleName(),
-                    getSpillingDirectoriesPathsString());
+        if (lazyFileChannelManager != null) {
+            lazyFileChannelManager.close();
+            if (LOG.isInfoEnabled()) {
+                LOG.info(
+                        "Closed {} with directories:\n\t{}",
+                        FileChannelManager.class.getSimpleName(),
+                        getSpillingDirectoriesPathsString());
+            }
         }
     }
 
     @Override
     public ID createChannel() {
-        return fileChannelManager.createChannel();
+        return fileChannelManager().createChannel();
     }
 
     @Override
     public ID createChannel(String prefix) {
-        return fileChannelManager.createChannel(prefix);
+        return fileChannelManager().createChannel(prefix);
     }
 
     @Override
@@ -93,7 +107,7 @@ public class IOManagerImpl implements IOManager {
 
     @Override
     public Enumerator createChannelEnumerator() {
-        return fileChannelManager.createChannelEnumerator();
+        return fileChannelManager().createChannelEnumerator();
     }
 
     /**
@@ -116,7 +130,7 @@ public class IOManagerImpl implements IOManager {
      * @return The directories that the I/O manager spills to.
      */
     public File[] getSpillingDirectories() {
-        return fileChannelManager.getPaths();
+        return fileChannelManager().getPaths();
     }
 
     /**
@@ -125,7 +139,7 @@ public class IOManagerImpl implements IOManager {
      * @return The directories that the I/O manager spills to, as path strings.
      */
     public String[] getSpillingDirectoriesPaths() {
-        File[] paths = fileChannelManager.getPaths();
+        File[] paths = fileChannelManager().getPaths();
         String[] strings = new String[paths.length];
         for (int i = 0; i < strings.length; i++) {
             strings[i] = paths[i].getAbsolutePath();
@@ -134,7 +148,7 @@ public class IOManagerImpl implements IOManager {
     }
 
     private String getSpillingDirectoriesPathsString() {
-        return Arrays.stream(fileChannelManager.getPaths())
+        return Arrays.stream(fileChannelManager().getPaths())
                 .map(File::getAbsolutePath)
                 .collect(Collectors.joining("\n\t"));
     }

Reply via email to