This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 414b410063 [flink] Introduce 'sink.writer-coordinator.page-size' to
control RPC size (#5960)
414b410063 is described below
commit 414b410063c00ef040b51db17955ce391fa07a82
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 28 13:53:24 2025 +0800
[flink] Introduce 'sink.writer-coordinator.page-size' to control RPC size
(#5960)
---
.../shortcodes/generated/core_configuration.html | 12 +--
.../generated/flink_connector_configuration.html | 6 ++
.../java/org/apache/paimon/utils/ArrayUtils.java | 22 ++++++
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 ++
.../sink/coordinator/CoordinatedWriteRestore.java | 37 +++++++--
.../sink/coordinator/PagedCoordinationRequest.java | 50 ++++++++++++
.../coordinator/PagedCoordinationResponse.java | 44 +++++++++++
.../sink/coordinator/TableWriteCoordinator.java | 91 ++++++++++++++++++++++
.../sink/coordinator/WriteOperatorCoordinator.java | 4 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 13 ++++
10 files changed, 273 insertions(+), 13 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 592986935a..e079a253e1 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -212,12 +212,6 @@ under the License.
<td>String</td>
<td>Specifies the commit user prefix.</td>
</tr>
- <tr>
- <td><h5>snapshot.ignore-empty-commit</h5></td>
- <td style="word-wrap: break-word;">(none)</td>
- <td>String</td>
- <td>Whether ignore empty commit.</td>
- </tr>
<tr>
<td><h5>compaction.delete-ratio-threshold</h5></td>
<td style="word-wrap: break-word;">0.2</td>
@@ -1007,6 +1001,12 @@ This config option does not affect the default
filesystem metastore.</td>
<td>Integer</td>
<td>The maximum number of snapshots allowed to expire at a
time.</td>
</tr>
+ <tr>
+ <td><h5>snapshot.ignore-empty-commit</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Boolean</td>
+ <td>Whether ignore empty commit.</td>
+ </tr>
<tr>
<td><h5>snapshot.num-retained.max</h5></td>
<td style="word-wrap: break-word;">infinite</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 12aded973f..55f4d1ddb1 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -308,6 +308,12 @@ under the License.
<td>Boolean</td>
<td>Enable sink writer coordinator to plan data files in Job
Manager.</td>
</tr>
+ <tr>
+ <td><h5>sink.writer-coordinator.page-size</h5></td>
+ <td style="word-wrap: break-word;">32 kb</td>
+ <td>MemorySize</td>
+ <td>Controls the page size for one RPC request of writer
coordinator.</td>
+ </tr>
<tr>
<td><h5>sink.writer-cpu</h5></td>
<td style="word-wrap: break-word;">1.0</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
b/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
index f107d462e7..40cc33ab9d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/ArrayUtils.java
@@ -18,6 +18,8 @@
package org.apache.paimon.utils;
+import java.util.List;
+
/** Utils for array. */
public class ArrayUtils {
@@ -355,4 +357,24 @@ public class ArrayUtils {
}
return result;
}
+
+ public static byte[] mergeByteArrays(List<byte[]> byteList) {
+ if (byteList.isEmpty()) {
+ return new byte[0];
+ }
+
+ if (byteList.size() == 1) {
+ return byteList.get(0);
+ }
+
+ int totalLength = byteList.stream().mapToInt(arr -> arr.length).sum();
+ byte[] result = new byte[totalLength];
+ int offset = 0;
+
+ for (byte[] arr : byteList) {
+ System.arraycopy(arr, 0, result, offset, arr.length);
+ offset += arr.length;
+ }
+ return result;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 08eb0f9e2e..62973763bf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -517,6 +517,13 @@ public class FlinkConnectorOptions {
.withDescription(
"Controls the cache memory of writer coordinator
to cache manifest files in Job Manager.");
+ public static final ConfigOption<MemorySize>
SINK_WRITER_COORDINATOR_PAGE_SIZE =
+ key("sink.writer-coordinator.page-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofKibiBytes(32))
+ .withDescription(
+ "Controls the page size for one RPC request of
writer coordinator.");
+
public static final ConfigOption<Boolean>
FILESYSTEM_JOB_LEVEL_SETTINGS_ENABLED =
key("filesystem.job-level-settings.enabled")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
index 155da9e223..31438cfbde 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/CoordinatedWriteRestore.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.sink.coordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.operation.RestoreFiles;
import org.apache.paimon.operation.WriteRestore;
+import org.apache.paimon.utils.ArrayUtils;
+import org.apache.paimon.utils.InstantiationUtil;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -28,8 +30,12 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
+import static org.apache.paimon.utils.InstantiationUtil.serializeObject;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/**
@@ -66,24 +72,45 @@ public class CoordinatedWriteRestore implements
WriteRestore {
int bucket,
boolean scanDynamicBucketIndex,
boolean scanDeleteVectorsIndex) {
- ScanCoordinationRequest request =
+ ScanCoordinationRequest coordinationRequest =
new ScanCoordinationRequest(
serializeBinaryRow(partition),
bucket,
scanDynamicBucketIndex,
scanDeleteVectorsIndex);
try {
- SerializedValue<CoordinationRequest> serializedRequest = new
SerializedValue<>(request);
+ byte[] requestContent = serializeObject(coordinationRequest);
+ Integer nextPageToken = null;
+ List<byte[]> result = new ArrayList<>();
+ String uuid = UUID.randomUUID().toString();
+
+ do {
+ PagedCoordinationRequest request =
+ new PagedCoordinationRequest(requestContent, uuid,
nextPageToken);
+ SerializedValue<CoordinationRequest> serializedRequest =
+ new SerializedValue<>(request);
+ PagedCoordinationResponse response =
+ CoordinationResponseUtils.unwrap(
+ gateway.sendRequestToCoordinator(operatorID,
serializedRequest)
+ .get());
+ result.add(response.content());
+ nextPageToken = response.nextPageToken();
+ } while (nextPageToken != null);
+
+ byte[] responseContent = ArrayUtils.mergeByteArrays(result);
ScanCoordinationResponse response =
- CoordinationResponseUtils.unwrap(
- gateway.sendRequestToCoordinator(operatorID,
serializedRequest).get());
+ InstantiationUtil.deserializeObject(
+ responseContent, getClass().getClassLoader());
return new RestoreFiles(
response.snapshot(),
response.totalBuckets(),
response.extractDataFiles(),
response.extractDynamicBucketIndex(),
response.extractDeletionVectorsIndex());
- } catch (IOException | ExecutionException | InterruptedException e) {
+ } catch (IOException
+ | ExecutionException
+ | InterruptedException
+ | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationRequest.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationRequest.java
new file mode 100644
index 0000000000..2588802955
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationRequest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.coordinator;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+
+import javax.annotation.Nullable;
+
+/** Paged {@link CoordinationRequest} with page token. */
+public class PagedCoordinationRequest implements CoordinationRequest {
+
+ private final byte[] content;
+ private final String requestId;
+ private final @Nullable Integer pageToken;
+
+ public PagedCoordinationRequest(byte[] content, String requestId,
@Nullable Integer pageToken) {
+ this.content = content;
+ this.requestId = requestId;
+ this.pageToken = pageToken;
+ }
+
+ public byte[] content() {
+ return content;
+ }
+
+ public String requestId() {
+ return requestId;
+ }
+
+ @Nullable
+ public Integer pageToken() {
+ return pageToken;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationResponse.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationResponse.java
new file mode 100644
index 0000000000..d1bcffe7c2
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/PagedCoordinationResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.coordinator;
+
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+
+import javax.annotation.Nullable;
+
+/** Paged {@link CoordinationResponse} with next page token. */
+public class PagedCoordinationResponse implements CoordinationResponse {
+
+ private final byte[] content;
+ private final @Nullable Integer nextPageToken;
+
+ public PagedCoordinationResponse(byte[] content, @Nullable Integer
nextPageToken) {
+ this.content = content;
+ this.nextPageToken = nextPageToken;
+ }
+
+ public byte[] content() {
+ return content;
+ }
+
+ @Nullable
+ public Integer nextPageToken() {
+ return nextPageToken;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
index fd5f057ed8..0f315880e9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/TableWriteCoordinator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.coordinator;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
@@ -28,14 +29,22 @@ import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.WriteRestore;
import org.apache.paimon.table.FileStoreTable;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.utils.InstantiationUtil.deserializeObject;
+import static org.apache.paimon.utils.InstantiationUtil.serializeObject;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -49,6 +58,8 @@ public class TableWriteCoordinator {
private final Map<String, Long> latestCommittedIdentifiers;
private final FileStoreScan scan;
private final IndexFileHandler indexFileHandler;
+ private final int pageSize;
+ private final Cache<CoordinationKey, byte[]> pagedCoordination;
private volatile Snapshot snapshot;
@@ -61,6 +72,17 @@ public class TableWriteCoordinator {
scan.dropStats();
}
this.indexFileHandler = table.store().newIndexFileHandler();
+ this.pageSize =
+ (int)
+ table.coreOptions()
+ .toConfiguration()
+
.get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_PAGE_SIZE)
+ .getBytes();
+ this.pagedCoordination =
+ Caffeine.newBuilder()
+ .executor(Runnable::run)
+ .expireAfterAccess(Duration.ofMinutes(30))
+ .build();
refresh();
}
@@ -73,6 +95,50 @@ public class TableWriteCoordinator {
this.scan.withSnapshot(snapshot);
}
+ public synchronized PagedCoordinationResponse
scan(PagedCoordinationRequest request)
+ throws IOException {
+ if (snapshot == null) {
+ return new PagedCoordinationResponse(
+ serializeObject(new ScanCoordinationResponse(null, null,
null, null, null)),
+ null);
+ }
+
+ Integer pageToken = request.pageToken();
+ CoordinationKey requestKey = new CoordinationKey(request.content(),
request.requestId());
+ if (pageToken != null) {
+ byte[] full = pagedCoordination.getIfPresent(requestKey);
+ if (full == null) {
+ throw new RuntimeException(
+ "This is a bug for write coordinator, request non
existence content.");
+ }
+ int len = Math.min(full.length - pageToken, pageSize);
+ byte[] content = Arrays.copyOfRange(full, pageToken, pageToken +
len);
+ Integer nextPageToken = pageToken + len;
+ if (nextPageToken >= full.length) {
+ nextPageToken = null;
+ pagedCoordination.invalidate(requestKey);
+ }
+ return new PagedCoordinationResponse(content, nextPageToken);
+ }
+
+ ScanCoordinationRequest coordination;
+ try {
+ coordination = deserializeObject(request.content(),
getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ ScanCoordinationResponse response = scan(coordination);
+ byte[] full = serializeObject(response);
+ if (full.length <= pageSize) {
+ return new PagedCoordinationResponse(full, null);
+ }
+
+ pagedCoordination.put(requestKey, full);
+ byte[] content = Arrays.copyOfRange(full, 0, pageSize);
+ return new PagedCoordinationResponse(content, pageSize);
+ }
+
public synchronized ScanCoordinationResponse scan(ScanCoordinationRequest
request)
throws IOException {
if (snapshot == null) {
@@ -126,4 +192,29 @@ public class TableWriteCoordinator {
// refresh latest committed identifiers for all users
latestCommittedIdentifiers.clear();
}
+
+ private static class CoordinationKey {
+
+ private final byte[] content;
+ private final String uuid;
+
+ private CoordinationKey(byte[] content, String uuid) {
+ this.content = content;
+ this.uuid = uuid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CoordinationKey that = (CoordinationKey) o;
+ return Objects.deepEquals(content, that.content) &&
Objects.equals(uuid, that.uuid);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(Arrays.hashCode(content), uuid);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
index 944628ae0e..334333fe76 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/coordinator/WriteOperatorCoordinator.java
@@ -79,8 +79,8 @@ public class WriteOperatorCoordinator implements
OperatorCoordinator, Coordinati
() -> {
try {
CoordinationResponse response;
- if (request instanceof ScanCoordinationRequest) {
- response =
coordinator.scan((ScanCoordinationRequest) request);
+ if (request instanceof PagedCoordinationRequest) {
+ response =
coordinator.scan((PagedCoordinationRequest) request);
} else if (request instanceof LatestIdentifierRequest)
{
response =
new LatestIdentifierResponse(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 2d968697d2..560a8fd8aa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -71,6 +71,19 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 33,
333));
}
+ @Test
+ public void testWriteRestoreCoordinatorWithPageSize() {
+ batchSql(
+ "CREATE TABLE IF NOT EXISTS PK (a INT PRIMARY KEY NOT
ENFORCED, b INT, c INT) WITH ('bucket' = '2')");
+ batchSql("ALTER TABLE PK SET ('sink.writer-coordinator.enabled' =
'true')");
+ batchSql("ALTER TABLE PK SET ('sink.writer-coordinator.page-size' =
'10 b')");
+ batchSql("INSERT INTO PK VALUES (1, 11, 111), (2, 22, 222), (3, 33,
333)");
+ batchSql("INSERT INTO PK VALUES (1, 11, 111), (2, 22, 222), (3, 33,
333)");
+ assertThat(batchSql("SELECT * FROM PK"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 33,
333));
+ }
+
@Test
public void testWriteRestoreCoordinatorDv() {
batchSql(