This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 414b410063 [flink] Introduce 'sink.writer-coordinator.page-size' to 
control RPC size (#5960)
414b410063 is described below

commit 414b410063c00ef040b51db17955ce391fa07a82
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 28 13:53:24 2025 +0800

    [flink] Introduce 'sink.writer-coordinator.page-size' to control RPC size 
(#5960)
---
 .../shortcodes/generated/core_configuration.html   | 12 +--
 .../generated/flink_connector_configuration.html   |  6 ++
 .../java/org/apache/paimon/utils/ArrayUtils.java   | 22 ++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  7 ++
 .../sink/coordinator/CoordinatedWriteRestore.java  | 37 +++++++--
 .../sink/coordinator/PagedCoordinationRequest.java | 50 ++++++++++++
 .../coordinator/PagedCoordinationResponse.java     | 44 +++++++++++
 .../sink/coordinator/TableWriteCoordinator.java    | 91 ++++++++++++++++++++++
 .../sink/coordinator/WriteOperatorCoordinator.java |  4 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 13 ++++
 10 files changed, 273 insertions(+), 13 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 592986935a..e079a253e1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -212,12 +212,6 @@ under the License.
             <td>String</td>
             <td>Specifies the commit user prefix.</td>
         </tr>
-        <tr>
-            <td><h5>snapshot.ignore-empty-commit</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Whether ignore empty commit.</td>
-        </tr>
         <tr>
             <td><h5>compaction.delete-ratio-threshold</h5></td>
             <td style="word-wrap: break-word;">0.2</td>
@@ -1007,6 +1001,12 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>Integer</td>
             <td>The maximum number of snapshots allowed to expire at a 
time.</td>
         </tr>
+        <tr>
+            <td><h5>snapshot.ignore-empty-commit</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Boolean</td>
+            <td>Whether ignore empty commit.</td>
+        </tr>
         <tr>
             <td><h5>snapshot.num-retained.max</h5></td>
             <td style="word-wrap: break-word;">infinite</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 12aded973f..55f4d1ddb1 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -308,6 +308,12 @@ under the License.
             <td>Boolean</td>
             <td>Enable sink writer coordinator to plan data files in Job 
Manager.</td>
         </tr>
+        <tr>
+            <td><h5>sink.writer-coordinator.page-size</h5></td>
+            <td style="word-wrap: break-word;">32 kb</td>
+            <td>MemorySize</td>
+            <td>Controls the page size for one RPC request of writer 
coordinator.</td>
+        </tr>
         <tr>
             <td><h5>sink.writer-cpu</h5></td>
             <td style="word-wrap: break-word;">1.0</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java 
b/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
index f107d462e7..40cc33ab9d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.utils;
 
+import java.util.List;
+
 /** Utils for array. */
 public class ArrayUtils {
 
@@ -355,4 +357,24 @@ public class ArrayUtils {
         }
         return result;
     }
+
+    public static byte[] mergeByteArrays(List<byte[]> byteList) {
+        if (byteList.isEmpty()) {
+            return new byte[0];
+        }
+
+        if (byteList.size() == 1) {
+            return byteList.get(0);
+        }
+
+        int totalLength = byteList.stream().mapToInt(arr -> arr.length).sum();
+        byte[] result = new byte[totalLength];
+        int offset = 0;
+
+        for (byte[] arr : byteList) {
+            System.arraycopy(arr, 0, result, offset, arr.length);
+            offset += arr.length;
+        }
+        return result;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 08eb0f9e2e..62973763bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -517,6 +517,13 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Controls the cache memory of writer coordinator 
to cache manifest files in Job Manager.");
 
+    public static final ConfigOption<MemorySize> 
SINK_WRITER_COORDINATOR_PAGE_SIZE =
+            key("sink.writer-coordinator.page-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofKibiBytes(32))
+                    .withDescription(
+                            "Controls the page size for one RPC request of 
writer coordinator.");
+
     public static final ConfigOption<Boolean> 
FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED =
             key("filesystem.job-level-settings.enabled")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
index 155da9e223..31438cfbde 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.sink.coordinator;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.operation.RestoreFiles;
 import org.apache.paimon.operation.WriteRestore;
+import org.apache.paimon.utils.ArrayUtils;
+import org.apache.paimon.utils.InstantiationUtil;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -28,8 +30,12 @@ import 
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.paimon.utils.InstantiationUtil.serializeObject;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
 /**
@@ -66,24 +72,45 @@ public class CoordinatedWriteRestore implements 
WriteRestore {
             int bucket,
             boolean scanDynamicBucketIndex,
             boolean scanDeleteVectorsIndex) {
-        ScanCoordinationRequest request =
+        ScanCoordinationRequest coordinationRequest =
                 new ScanCoordinationRequest(
                         serializeBinaryRow(partition),
                         bucket,
                         scanDynamicBucketIndex,
                         scanDeleteVectorsIndex);
         try {
-            SerializedValue<CoordinationRequest> serializedRequest = new 
SerializedValue<>(request);
+            byte[] requestContent = serializeObject(coordinationRequest);
+            Integer nextPageToken = null;
+            List<byte[]> result = new ArrayList<>();
+            String uuid = UUID.randomUUID().toString();
+
+            do {
+                PagedCoordinationRequest request =
+                        new PagedCoordinationRequest(requestContent, uuid, 
nextPageToken);
+                SerializedValue<CoordinationRequest> serializedRequest =
+                        new SerializedValue<>(request);
+                PagedCoordinationResponse response =
+                        CoordinationResponseUtils.unwrap(
+                                gateway.sendRequestToCoordinator(operatorID, 
serializedRequest)
+                                        .get());
+                result.add(response.content());
+                nextPageToken = response.nextPageToken();
+            } while (nextPageToken != null);
+
+            byte[] responseContent = ArrayUtils.mergeByteArrays(result);
             ScanCoordinationResponse response =
-                    CoordinationResponseUtils.unwrap(
-                            gateway.sendRequestToCoordinator(operatorID, 
serializedRequest).get());
+                    InstantiationUtil.deserializeObject(
+                            responseContent, getClass().getClassLoader());
             return new RestoreFiles(
                     response.snapshot(),
                     response.totalBuckets(),
                     response.extractDataFiles(),
                     response.extractDynamicBucketIndex(),
                     response.extractDeletionVectorsIndex());
-        } catch (IOException | ExecutionException | InterruptedException e) {
+        } catch (IOException
+                | ExecutionException
+                | InterruptedException
+                | ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationRequest.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationRequest.java
new file mode 100644
index 0000000000..2588802955
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationRequest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.coordinator;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+import javax.annotation.Nullable;
+
+/** Paged {@link CoordinationRequest} with page token. */
+public class PagedCoordinationRequest implements CoordinationRequest {
+
+    private final byte[] content;
+    private final String requestId;
+    private final @Nullable Integer pageToken;
+
+    public PagedCoordinationRequest(byte[] content, String requestId, 
@Nullable Integer pageToken) {
+        this.content = content;
+        this.requestId = requestId;
+        this.pageToken = pageToken;
+    }
+
+    public byte[] content() {
+        return content;
+    }
+
+    public String requestId() {
+        return requestId;
+    }
+
+    @Nullable
+    public Integer pageToken() {
+        return pageToken;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationResponse.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationResponse.java
new file mode 100644
index 0000000000..d1bcffe7c2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.coordinator;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+import javax.annotation.Nullable;
+
+/** Paged {@link CoordinationResponse} with next page token. */
+public class PagedCoordinationResponse implements CoordinationResponse {
+
+    private final byte[] content;
+    private final @Nullable Integer nextPageToken;
+
+    public PagedCoordinationResponse(byte[] content, @Nullable Integer 
nextPageToken) {
+        this.content = content;
+        this.nextPageToken = nextPageToken;
+    }
+
+    public byte[] content() {
+        return content;
+    }
+
+    @Nullable
+    public Integer nextPageToken() {
+        return nextPageToken;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
index fd5f057ed8..0f315880e9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.coordinator;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
@@ -28,14 +29,22 @@ import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.WriteRestore;
 import org.apache.paimon.table.FileStoreTable;
 
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.utils.InstantiationUtil.deserializeObject;
+import static org.apache.paimon.utils.InstantiationUtil.serializeObject;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 
@@ -49,6 +58,8 @@ public class TableWriteCoordinator {
     private final Map<String, Long> latestCommittedIdentifiers;
     private final FileStoreScan scan;
     private final IndexFileHandler indexFileHandler;
+    private final int pageSize;
+    private final Cache<CoordinationKey, byte[]> pagedCoordination;
 
     private volatile Snapshot snapshot;
 
@@ -61,6 +72,17 @@ public class TableWriteCoordinator {
             scan.dropStats();
         }
         this.indexFileHandler = table.store().newIndexFileHandler();
+        this.pageSize =
+                (int)
+                        table.coreOptions()
+                                .toConfiguration()
+                                
.get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PAGE_SIZE)
+                                .getBytes();
+        this.pagedCoordination =
+                Caffeine.newBuilder()
+                        .executor(Runnable::run)
+                        .expireAfterAccess(Duration.ofMinutes(30))
+                        .build();
         refresh();
     }
 
@@ -73,6 +95,50 @@ public class TableWriteCoordinator {
         this.scan.withSnapshot(snapshot);
     }
 
+    public synchronized PagedCoordinationResponse 
scan(PagedCoordinationRequest request)
+            throws IOException {
+        if (snapshot == null) {
+            return new PagedCoordinationResponse(
+                    serializeObject(new ScanCoordinationResponse(null, null, 
null, null, null)),
+                    null);
+        }
+
+        Integer pageToken = request.pageToken();
+        CoordinationKey requestKey = new CoordinationKey(request.content(), 
request.requestId());
+        if (pageToken != null) {
+            byte[] full = pagedCoordination.getIfPresent(requestKey);
+            if (full == null) {
+                throw new RuntimeException(
+                        "This is a bug for write coordinator, request non 
existence content.");
+            }
+            int len = Math.min(full.length - pageToken, pageSize);
+            byte[] content = Arrays.copyOfRange(full, pageToken, pageToken + 
len);
+            Integer nextPageToken = pageToken + len;
+            if (nextPageToken >= full.length) {
+                nextPageToken = null;
+                pagedCoordination.invalidate(requestKey);
+            }
+            return new PagedCoordinationResponse(content, nextPageToken);
+        }
+
+        ScanCoordinationRequest coordination;
+        try {
+            coordination = deserializeObject(request.content(), 
getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+
+        ScanCoordinationResponse response = scan(coordination);
+        byte[] full = serializeObject(response);
+        if (full.length <= pageSize) {
+            return new PagedCoordinationResponse(full, null);
+        }
+
+        pagedCoordination.put(requestKey, full);
+        byte[] content = Arrays.copyOfRange(full, 0, pageSize);
+        return new PagedCoordinationResponse(content, pageSize);
+    }
+
     public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest 
request)
             throws IOException {
         if (snapshot == null) {
@@ -126,4 +192,29 @@ public class TableWriteCoordinator {
         // refresh latest committed identifiers for all users
         latestCommittedIdentifiers.clear();
     }
+
+    private static class CoordinationKey {
+
+        private final byte[] content;
+        private final String uuid;
+
+        private CoordinationKey(byte[] content, String uuid) {
+            this.content = content;
+            this.uuid = uuid;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CoordinationKey that = (CoordinationKey) o;
+            return Objects.deepEquals(content, that.content) && 
Objects.equals(uuid, that.uuid);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(Arrays.hashCode(content), uuid);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
index 944628ae0e..334333fe76 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
@@ -79,8 +79,8 @@ public class WriteOperatorCoordinator implements 
OperatorCoordinator, Coordinati
                 () -> {
                     try {
                         CoordinationResponse response;
-                        if (request instanceof ScanCoordinationRequest) {
-                            response = 
coordinator.scan((ScanCoordinationRequest) request);
+                        if (request instanceof PagedCoordinationRequest) {
+                            response = 
coordinator.scan((PagedCoordinationRequest) request);
                         } else if (request instanceof LatestIdentifierRequest) 
{
                             response =
                                     new LatestIdentifierResponse(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 2d968697d2..560a8fd8aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -71,6 +71,19 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
                         Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 33, 
333));
     }
 
+    @Test
+    public void testWriteRestoreCoordinatorWithPageSize() {
+        batchSql(
+                "CREATE TABLE IF NOT EXISTS PK (a INT PRIMARY KEY NOT 
ENFORCED, b INT, c INT) WITH ('bucket' = '2')");
+        batchSql("ALTER TABLE PK SET ('sink.writer-coordinator.enabled' = 
'true')");
+        batchSql("ALTER TABLE PK SET ('sink.writer-coordinator.page-size' = 
'10 b')");
+        batchSql("INSERT INTO PK VALUES (1, 11, 111), (2, 22, 222), (3, 33, 
333)");
+        batchSql("INSERT INTO PK VALUES (1, 11, 111), (2, 22, 222), (3, 33, 
333)");
+        assertThat(batchSql("SELECT * FROM PK"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 33, 
333));
+    }
+
     @Test
     public void testWriteRestoreCoordinatorDv() {
         batchSql(

Reply via email to