This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f201bbf4d6 [core] Should not close channelManager in 
BinaryExternalSortBuffer (#5466)
f201bbf4d6 is described below

commit f201bbf4d69c866771d2de2f870a64bd2518e892
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 14 20:13:10 2025 +0800

    [core] Should not close channelManager in BinaryExternalSortBuffer (#5466)
---
 .../apache/paimon/sort/BinaryExternalSortBuffer.java |  5 ++---
 .../org/apache/paimon/sort/SpillChannelManager.java  | 20 ++------------------
 .../paimon/sort/BinaryExternalSortBufferTest.java    |  9 ++++++++-
 3 files changed, 12 insertions(+), 22 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index 4bfbcd5ec7..c6495811de 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -50,7 +50,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
     private final BinaryRowSerializer serializer;
     private final BinaryInMemorySortBuffer inMemorySortBuffer;
     private final IOManager ioManager;
-    private SpillChannelManager channelManager;
+    private final SpillChannelManager channelManager;
     private final int maxNumFileHandles;
     private final BlockCompressionFactory compressionCodecFactory;
     private final int compressionBlockSize;
@@ -154,8 +154,7 @@ public class BinaryExternalSortBuffer implements SortBuffer 
{
         inMemorySortBuffer.clear();
         spillChannelIDs.clear();
         // delete files
-        channelManager.close();
-        channelManager = new SpillChannelManager();
+        channelManager.reset();
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java 
b/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
index ee21427cac..5118eb2aac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
@@ -20,22 +20,17 @@ package org.apache.paimon.sort;
 
 import org.apache.paimon.disk.FileIOChannel;
 
-import java.io.Closeable;
 import java.io.File;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** Channel manager to manage the life cycle of spill channels. */
-public class SpillChannelManager implements Closeable {
+public class SpillChannelManager {
 
     private final HashSet<FileIOChannel.ID> channels;
     private final HashSet<FileIOChannel> openChannels;
 
-    private volatile boolean closed;
-
     public SpillChannelManager() {
         this.channels = new HashSet<>(64);
         this.openChannels = new HashSet<>(64);
@@ -43,13 +38,11 @@ public class SpillChannelManager implements Closeable {
 
     /** Add a new File channel. */
     public synchronized void addChannel(FileIOChannel.ID id) {
-        checkArgument(!closed);
         channels.add(id);
     }
 
     /** Open File channels. */
     public synchronized void addOpenChannels(List<FileIOChannel> toOpen) {
-        checkArgument(!closed);
         for (FileIOChannel channel : toOpen) {
             openChannels.add(channel);
             channels.remove(channel.getChannelID());
@@ -57,19 +50,10 @@ public class SpillChannelManager implements Closeable {
     }
 
     public synchronized void removeChannel(FileIOChannel.ID id) {
-        checkArgument(!closed);
         channels.remove(id);
     }
 
-    @Override
-    public synchronized void close() {
-
-        if (this.closed) {
-            return;
-        }
-
-        this.closed = true;
-
+    public synchronized void reset() {
         for (Iterator<FileIOChannel> channels = this.openChannels.iterator();
                 channels.hasNext(); ) {
             final FileIOChannel channel = channels.next();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
index 59ef802a1c..0ff35a0606 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
@@ -188,8 +188,15 @@ public class BinaryExternalSortBufferTest {
         innerTestSpilling(createBuffer());
     }
 
+    @Test
+    public void testSpillingAndClearWithMaxFanIn() throws Exception {
+        BinaryExternalSortBuffer buffer = createBuffer(2);
+        innerTestSpilling(buffer);
+        innerTestSpilling(buffer);
+    }
+
     private void innerTestSpilling(BinaryExternalSortBuffer sorter) throws 
Exception {
-        int size = 1000_000;
+        int size = 2000_000;
 
         MockBinaryRowReader reader = new MockBinaryRowReader(size);
         sorter.write(reader);

Reply via email to