This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 371fa748ce [hotfix] Pick one dir randomly in GlobalIndexAssigner
371fa748ce is described below

commit 371fa748ce7b03ab12012656c8e39e20062439bb
Author: JingsongLi <[email protected]>
AuthorDate: Tue May 13 23:13:04 2025 +0800

    [hotfix] Pick one dir randomly in GlobalIndexAssigner
---
 .../paimon/crosspartition/GlobalIndexAssigner.java | 13 ++----
 .../java/org/apache/paimon/disk/IOManagerImpl.java | 48 +++++++++++++---------
 2 files changed, 32 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index fa9f0464ae..892e27e966 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -72,6 +72,7 @@ import java.util.function.Function;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.lookup.RocksDBOptions.BLOCK_CACHE_SIZE;
+import static org.apache.paimon.utils.ListUtils.pickRandomly;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Assign UPDATE_BEFORE and bucket for the input record, output record with 
bucket. */
@@ -132,15 +133,9 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
         this.extractor = new 
RowPartitionAllPrimaryKeyExtractor(table.schema());
         this.keyPartExtractor = new 
KeyPartPartitionKeyExtractor(table.schema());
 
-        for (String tmpDir : ioManager.tempDirs()) {
-            File rocksDBDir = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
-            if (rocksDBDir.mkdirs()) {
-                this.path = rocksDBDir;
-                break;
-            }
-        }
-
-        if (path == null) {
+        String tmpDir = pickRandomly(Arrays.asList(ioManager.tempDirs()));
+        this.path = new File(tmpDir, "rocksdb-" + UUID.randomUUID());
+        if (!this.path.mkdirs()) {
             throw new RuntimeException(
                     "Failed to create RocksDB cache directory in temp dirs: "
                             + Arrays.toString(ioManager.tempDirs()));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
index a67dcfb8c4..c0858dbf28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java
@@ -41,7 +41,7 @@ public class IOManagerImpl implements IOManager {
 
     private final String[] tempDirs;
 
-    private volatile FileChannelManager lazyFileChannelManager;
+    private volatile FileChannelManager lazyChannelManager;
 
     // 
-------------------------------------------------------------------------
     //               Constructors / Destructors
@@ -58,35 +58,43 @@ public class IOManagerImpl implements IOManager {
     }
 
     private FileChannelManager fileChannelManager() {
-        if (lazyFileChannelManager == null) {
+        if (lazyChannelManager == null) {
             synchronized (this) {
-                if (lazyFileChannelManager == null) {
-                    this.lazyFileChannelManager =
-                            new FileChannelManagerImpl(tempDirs, 
DIR_NAME_PREFIX);
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info(
-                                "Created a new {} for spilling of task related 
data to disk (joins, sorting, ...). Used directories:\n\t{}",
-                                FileChannelManager.class.getSimpleName(),
-                                getSpillingDirectoriesPathsString());
-                    }
+                if (lazyChannelManager == null) {
+                    lazyChannelManager = createFileChannelManager();
                 }
             }
         }
 
-        return lazyFileChannelManager;
+        return lazyChannelManager;
     }
 
     /** Removes all temporary files. */
     @Override
     public void close() throws Exception {
-        if (lazyFileChannelManager != null) {
-            lazyFileChannelManager.close();
-            if (LOG.isInfoEnabled()) {
-                LOG.info(
-                        "Closed {} with directories:\n\t{}",
-                        FileChannelManager.class.getSimpleName(),
-                        getSpillingDirectoriesPathsString());
-            }
+        if (lazyChannelManager != null) {
+            closeFileChannelManager(lazyChannelManager);
+        }
+    }
+
+    private FileChannelManager createFileChannelManager() {
+        FileChannelManager channelManager = new 
FileChannelManagerImpl(tempDirs, DIR_NAME_PREFIX);
+        if (LOG.isInfoEnabled()) {
+            LOG.info(
+                    "Created a new {} for spilling of task related data to 
disk (joins, sorting, ...). Used directories:\n\t{}",
+                    FileChannelManager.class.getSimpleName(),
+                    getSpillingDirectoriesPathsString());
+        }
+        return channelManager;
+    }
+
+    private void closeFileChannelManager(FileChannelManager 
fileChannelManager) throws Exception {
+        fileChannelManager.close();
+        if (LOG.isInfoEnabled()) {
+            LOG.info(
+                    "Closed {} with directories:\n\t{}",
+                    FileChannelManager.class.getSimpleName(),
+                    getSpillingDirectoriesPathsString());
         }
     }
 

Reply via email to