This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f9a2733d5 [core] Fix one record cannot fit into a single page in 
ParallelExecution (#5353)
5f9a2733d5 is described below

commit 5f9a2733d5227af913d157c0f0268b4b53f6e5e7
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 26 20:33:27 2025 +0800

    [core] Fix one record cannot fit into a single page in ParallelExecution 
(#5353)
---
 .../java/org/apache/paimon/utils/ParallelExecution.java    | 10 ++++++++++
 .../org/apache/paimon/utils/ParallelExecutionTest.java     | 14 ++++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
index 9939b70c25..f3aabb8bac 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.SimpleCollectingOutputView;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.memory.ArraySegmentPool;
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReader;
 
 import javax.annotation.Nullable;
@@ -52,6 +53,7 @@ import java.util.function.Supplier;
 public class ParallelExecution<T, E> implements Closeable {
 
     private final Serializer<T> serializer;
+    private final int pageSize;
     private final BlockingQueue<MemorySegment> idlePages;
     private final BlockingQueue<ParallelBatch<T, E>> results;
     private final ExecutorService executorService;
@@ -66,6 +68,7 @@ public class ParallelExecution<T, E> implements Closeable {
             int parallelism,
             List<Supplier<Pair<RecordReader<T>, E>>> readers) {
         this.serializer = serializer;
+        this.pageSize = pageSize;
         int totalPages = parallelism * 2;
         this.idlePages = new ArrayBlockingQueue<>(totalPages);
         for (int i = 0; i < totalPages; i++) {
@@ -127,6 +130,13 @@ public class ParallelExecution<T, E> implements Closeable {
                         count++;
                         break;
                     } catch (EOFException e) {
+                        if (count == 0) {
+                            throw new RuntimeException(
+                                    String.format(
+                                            "Current page size %s is too 
small, one record cannot fit into a single page. "
+                                                    + "Please increase the 
'page-size' table option.",
+                                            new 
MemorySize(pageSize).toHumanReadableString()));
+                        }
                         sendToResults(outputView, count, pair.getRight());
                         outputView = null;
                     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
index 4ec4755d62..4e3feab578 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.function.Supplier;
 
+import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -128,6 +129,19 @@ public class ParallelExecutionTest {
         assertThatThrownBy(() -> 
collect(execution)).hasMessageContaining(message);
     }
 
+    @Test
+    public void testTooBigRecord() {
+        Supplier<Pair<RecordReader<Integer>, Integer>> supplier =
+                () -> Pair.of(create(new 
LinkedList<>(singletonList(singletonList(1)))), 1);
+
+        ParallelExecution<Integer, Integer> execution =
+                new ParallelExecution<>(new IntSerializer(), 2, 2, 
singletonList(supplier));
+        assertThatThrownBy(() -> collect(execution))
+                .hasMessageContaining(
+                        "Current page size 2 bytes is too small, one record 
cannot fit into a single page."
+                                + " Please increase the 'page-size' table 
option.");
+    }
+
     private RecordReader<Integer> create(Queue<List<Integer>> queue) {
         return new RecordReader<Integer>() {
             @Nullable

Reply via email to