This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f201bbf4d6 [core] Should not close channelManager in
BinaryExternalSortBuffer (#5466)
f201bbf4d6 is described below
commit f201bbf4d69c866771d2de2f870a64bd2518e892
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Apr 14 20:13:10 2025 +0800
[core] Should not close channelManager in BinaryExternalSortBuffer (#5466)
---
.../apache/paimon/sort/BinaryExternalSortBuffer.java | 5 ++---
.../org/apache/paimon/sort/SpillChannelManager.java | 20 ++------------------
.../paimon/sort/BinaryExternalSortBufferTest.java | 9 ++++++++-
3 files changed, 12 insertions(+), 22 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
index 4bfbcd5ec7..c6495811de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java
@@ -50,7 +50,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
private final BinaryRowSerializer serializer;
private final BinaryInMemorySortBuffer inMemorySortBuffer;
private final IOManager ioManager;
- private SpillChannelManager channelManager;
+ private final SpillChannelManager channelManager;
private final int maxNumFileHandles;
private final BlockCompressionFactory compressionCodecFactory;
private final int compressionBlockSize;
@@ -154,8 +154,7 @@ public class BinaryExternalSortBuffer implements SortBuffer
{
inMemorySortBuffer.clear();
spillChannelIDs.clear();
// delete files
- channelManager.close();
- channelManager = new SpillChannelManager();
+ channelManager.reset();
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
b/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
index ee21427cac..5118eb2aac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java
@@ -20,22 +20,17 @@ package org.apache.paimon.sort;
import org.apache.paimon.disk.FileIOChannel;
-import java.io.Closeable;
import java.io.File;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** Channel manager to manage the life cycle of spill channels. */
-public class SpillChannelManager implements Closeable {
+public class SpillChannelManager {
private final HashSet<FileIOChannel.ID> channels;
private final HashSet<FileIOChannel> openChannels;
- private volatile boolean closed;
-
public SpillChannelManager() {
this.channels = new HashSet<>(64);
this.openChannels = new HashSet<>(64);
@@ -43,13 +38,11 @@ public class SpillChannelManager implements Closeable {
/** Add a new File channel. */
public synchronized void addChannel(FileIOChannel.ID id) {
- checkArgument(!closed);
channels.add(id);
}
/** Open File channels. */
public synchronized void addOpenChannels(List<FileIOChannel> toOpen) {
- checkArgument(!closed);
for (FileIOChannel channel : toOpen) {
openChannels.add(channel);
channels.remove(channel.getChannelID());
@@ -57,19 +50,10 @@ public class SpillChannelManager implements Closeable {
}
public synchronized void removeChannel(FileIOChannel.ID id) {
- checkArgument(!closed);
channels.remove(id);
}
- @Override
- public synchronized void close() {
-
- if (this.closed) {
- return;
- }
-
- this.closed = true;
-
+ public synchronized void reset() {
for (Iterator<FileIOChannel> channels = this.openChannels.iterator();
channels.hasNext(); ) {
final FileIOChannel channel = channels.next();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
index 59ef802a1c..0ff35a0606 100644
---
a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java
@@ -188,8 +188,15 @@ public class BinaryExternalSortBufferTest {
innerTestSpilling(createBuffer());
}
+ @Test
+ public void testSpillingAndClearWithMaxFanIn() throws Exception {
+ BinaryExternalSortBuffer buffer = createBuffer(2);
+ innerTestSpilling(buffer);
+ innerTestSpilling(buffer);
+ }
+
private void innerTestSpilling(BinaryExternalSortBuffer sorter) throws
Exception {
- int size = 1000_000;
+ int size = 2000_000;
MockBinaryRowReader reader = new MockBinaryRowReader(size);
sorter.write(reader);