This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new c34765143 [FLINK-37731][pipeline-paimon]Support Postpone Bucket (#4386)
c34765143 is described below
commit c347651437b42b175b1461b4d2ffa1d0e2ab4574
Author: Thorne <[email protected]>
AuthorDate: Thu May 14 17:21:44 2026 +0800
[FLINK-37731][pipeline-paimon]Support Postpone Bucket (#4386)
---
.../connectors/paimon/sink/v2/PaimonWriter.java | 7 +-
.../sink/v2/bucket/BucketAssignOperator.java | 12 +
.../sink/v2/bucket/PaimonPostponeBucketTest.java | 252 +++++++++++++++++++++
3 files changed, 270 insertions(+), 1 deletion(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index 9468464c8..2e1f9ea15 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -36,6 +36,7 @@ import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.slf4j.Logger;
@@ -190,8 +191,12 @@ public class PaimonWriter<InputT>
return storeSinkWrite;
});
try {
+ int bucket =
+ table.bucketMode() == BucketMode.POSTPONE_MODE
+ ? BucketMode.POSTPONE_BUCKET
+ : paimonEvent.getBucket();
for (GenericRow genericRow : paimonEvent.getGenericRows()) {
- write.write(genericRow, paimonEvent.getBucket());
+ write.write(genericRow, bucket);
}
} catch (Exception e) {
throw new IOException(e);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index b2a14a2c4..d523feb72 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -188,6 +188,18 @@ public class BucketAssignOperator extends
AbstractStreamOperatorAdapter<Event>
partition = 0;
break;
}
+ case POSTPONE_MODE:
+ {
+ // Postpone bucket tables: the actual bucket written
to Paimon is -2
+ // (assigned by a downstream compaction job). However,
using -2 as the
+ // shuffle key here would route all postpone events to
the same writer
+ // subtask and cause severe data skew. Use
currentTaskNumber so events
+ // are evenly spread across writer subtasks;
PaimonWriter will rewrite
+ // the bucket back to POSTPONE_BUCKET (-2) when
persisting records.
+ bucket = currentTaskNumber;
+ partition = tuple4.f3.partition(genericRow).hashCode();
+ break;
+ }
case KEY_DYNAMIC:
default:
{
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java
new file mode 100644
index 000000000..3621f4d7a
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/PaimonPostponeBucketTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.TaskInfoImpl;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier;
+import
org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordEventSerializer;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.OutputTag;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.cdc.common.types.DataTypes.STRING;
+
+/** Tests for postpone bucket mode in Paimon bucket assignment. */
+class PaimonPostponeBucketTest {
+
+ @TempDir public static java.nio.file.Path temporaryFolder;
+
+ private static final String TEST_DATABASE = "test";
+ private static final TableId TABLE_ID = TableId.tableId(TEST_DATABASE,
"table1");
+
+ @Test
+ void testBucketAssignOperatorUsesCurrentSubtaskForPostponeBucketTable()
throws Exception {
+ Options catalogOptions = createCatalogOptions();
+ Schema schema = createPostponeBucketSchema();
+ new PaimonMetadataApplier(catalogOptions)
+ .applySchemaChange(new CreateTableEvent(TABLE_ID, schema));
+
+ BucketAssignOperator bucketAssignOperator =
+ new BucketAssignOperator(catalogOptions, null,
ZoneId.systemDefault(), null);
+ CollectingOutput output = new CollectingOutput();
+ setField(bucketAssignOperator, "output", output);
+ int currentSubtask = 2;
+ bucketAssignOperator.open(createTaskInfo(4, currentSubtask));
+ bucketAssignOperator.convertSchemaChangeEvent(new
CreateTableEvent(TABLE_ID, schema));
+
+ bucketAssignOperator.processElement(new
StreamRecord<>(createInsertEvent("1", "Alice")));
+
+ Assertions.assertThat(output.records).hasSize(1);
+ Event event = output.records.get(0).getValue();
+
Assertions.assertThat(event).isInstanceOf(BucketWrapperChangeEvent.class);
+ BucketWrapperChangeEvent bucketWrapperChangeEvent =
(BucketWrapperChangeEvent) event;
+
Assertions.assertThat(bucketWrapperChangeEvent.getBucket()).isEqualTo(currentSubtask);
+ Assertions.assertThat(bucketWrapperChangeEvent.getBucket())
+ .isNotEqualTo(BucketMode.POSTPONE_BUCKET);
+ Assertions.assertThat(bucketWrapperChangeEvent.getInnerEvent())
+ .isInstanceOf(DataChangeEvent.class);
+ }
+
+ @Test
+ void testSerializerKeepsAssignedBucketBeforeWriterRewrite() throws
Exception {
+ Schema schema = createPostponeBucketSchema();
+ PaimonRecordSerializer<Event> serializer =
+ new PaimonRecordEventSerializer(ZoneId.systemDefault());
+ serializer.serialize(new CreateTableEvent(TABLE_ID, schema));
+ int assignedBucket = 2;
+ BucketWrapperChangeEvent bucketWrapperChangeEvent =
+ new BucketWrapperChangeEvent(assignedBucket, 0,
createInsertEvent("1", "Alice"));
+
+
Assertions.assertThat(serializer.serialize(bucketWrapperChangeEvent).getBucket())
+ .isEqualTo(assignedBucket);
+ }
+
+ @Test
+ void testPostponeBucketTableRewritesAssignedBucketToMinusTwoWhenWriting()
throws Exception {
+ Options catalogOptions = createCatalogOptions();
+ Schema schema = createPostponeBucketSchema();
+ new PaimonMetadataApplier(catalogOptions)
+ .applySchemaChange(new CreateTableEvent(TABLE_ID, schema));
+ FileStoreTable table =
+ (FileStoreTable)
+ FlinkCatalogFactory.createPaimonCatalog(catalogOptions)
+ .getTable(
+
org.apache.paimon.catalog.Identifier.fromString(
+ TABLE_ID.toString()));
+
+ int assignedBucket = 2;
+ List<Integer> writtenBuckets = new ArrayList<>();
+ try
(MockedConstruction<org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl>
+ ignored =
+ Mockito.mockConstruction(
+
org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl
+ .class,
+ (mock, context) -> {
+ Mockito.doAnswer(
+ invocation -> {
+ writtenBuckets.add(
+
invocation.getArgument(1));
+ return null;
+ })
+ .when(mock)
+
.write(Mockito.any(GenericRow.class), Mockito.anyInt());
+ })) {
+ org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter<Event>
writer =
+ new
org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter<>(
+ catalogOptions,
+
UnregisteredMetricsGroup.createSinkWriterMetricGroup(),
+ "test-user",
+ new
PaimonRecordEventSerializer(ZoneId.systemDefault()),
+ 0);
+
+ writer.write(new CreateTableEvent(TABLE_ID, schema), null);
+ writer.write(
+ new BucketWrapperChangeEvent(
+ assignedBucket, 0, createInsertEvent("1",
"Alice")),
+ null);
+ writer.close();
+ }
+
+
Assertions.assertThat(table.bucketMode()).isEqualTo(BucketMode.POSTPONE_MODE);
+
Assertions.assertThat(writtenBuckets).containsExactly(BucketMode.POSTPONE_BUCKET);
+ }
+
+ private Options createCatalogOptions() {
+ Options catalogOptions = new Options();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.setString("warehouse", warehouse);
+ catalogOptions.setString("cache-enabled", "false");
+ return catalogOptions;
+ }
+
+ private Schema createPostponeBucketSchema() {
+ return Schema.newBuilder()
+ .physicalColumn("col1", STRING().notNull())
+ .physicalColumn("col2", STRING())
+ .primaryKey("col1")
+ .option("bucket", String.valueOf(BucketMode.POSTPONE_BUCKET))
+ .build();
+ }
+
+ private DataChangeEvent createInsertEvent(String col1, String col2) {
+ return DataChangeEvent.insertEvent(
+ TABLE_ID,
+ generate(Arrays.asList(Tuple2.of(STRING(), col1),
Tuple2.of(STRING(), col2))));
+ }
+
+ private BinaryRecordData generate(List<Tuple2<DataType, Object>> elements)
{
+ org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator
generator =
+ new
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator(
+ RowType.of(elements.stream().map(e ->
e.f0).toArray(DataType[]::new)));
+ return generator.generate(
+ elements.stream()
+ .map(e -> e.f1)
+ .map(o -> o instanceof String ?
BinaryStringData.fromString((String) o) : o)
+ .toArray(Object[]::new));
+ }
+
+ private TaskInfo createTaskInfo(int numberOfParallelSubtasks, int
indexOfThisSubtask) {
+ return new TaskInfoImpl(
+ "test-task",
+ numberOfParallelSubtasks,
+ indexOfThisSubtask,
+ numberOfParallelSubtasks,
+ 0);
+ }
+
+ private void setField(Object target, String fieldName, Object value)
throws Exception {
+ Class<?> current = target.getClass();
+ while (current != null) {
+ try {
+ Field field = current.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ return;
+ } catch (NoSuchFieldException e) {
+ current = current.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException(fieldName);
+ }
+
+ private static class CollectingOutput implements
Output<StreamRecord<Event>> {
+ private final List<StreamRecord<Event>> records = new ArrayList<>();
+
+ public void
emitWatermark(org.apache.flink.runtime.event.WatermarkEvent watermark) {}
+
+ @Override
+ public void emitWatermark(Watermark mark) {}
+
+ @Override
+ public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+ @Override
+ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X>
streamRecord) {}
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+ @Override
+ public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+
+ @Override
+ public void collect(StreamRecord<Event> record) {
+ records.add(record);
+ }
+
+ @Override
+ public void close() {}
+ }
+}