This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 7acc625f1 [FLINK-39718][pipeline][paimon] Paimon pipeline sink fails
with distributed source when target table does not exist. (#4406)
7acc625f1 is described below
commit 7acc625f1ed9ab974f74cc60449c75886c0ca583
Author: Thorne <[email protected]>
AuthorDate: Sun May 31 14:22:46 2026 +0800
[FLINK-39718][pipeline][paimon] Paimon pipeline sink fails with distributed
source when target table does not exist. (#4406)
---
.../cdc/connectors/paimon/sink/PaimonDataSink.java | 2 +-
.../connectors/paimon/sink/PaimonHashFunction.java | 56 ++++++++++++++--------
.../paimon/sink/PaimonHashFunctionProvider.java | 9 +---
.../paimon/sink/PaimonHashFunctionTest.java | 54 ++-------------------
4 files changed, 43 insertions(+), 78 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
index 5a95f1efd..f48879605 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonDataSink.java
@@ -85,6 +85,6 @@ public class PaimonDataSink implements DataSink, Serializable
{
@Override
public HashFunctionProvider<DataChangeEvent>
getDataChangeEventHashFunctionProvider(
int parallelism) {
- return new PaimonHashFunctionProvider(options, zoneId, parallelism);
+ return new PaimonHashFunctionProvider(zoneId, parallelism);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
index 405a07c39..b17071942 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java
@@ -19,28 +19,32 @@ package org.apache.flink.cdc.connectors.paimon.sink;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.DataChangeEvent;
-import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.paimon.sink.utils.TypeUtils;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
-import org.apache.paimon.AppendOnlyFileStore;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.RowAssignerChannelComputer;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
import java.io.Serializable;
import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle
{@link DataChangeEvent}
* by hash of PrimaryKey.
+ *
+ * <p>Table type (append-only vs. primary-key) is inferred directly from the
CDC {@link Schema}
+ * instead of querying the Paimon catalog. This avoids a {@code
TableNotExistException} when the
+ * target table has not yet been created by {@code MetadataApplier}, which can
happen in distributed
+ * pipeline topologies where pre-partitioning precedes schema coordination.
*/
public class PaimonHashFunction implements HashFunction<DataChangeEvent>,
Serializable {
@@ -52,23 +56,17 @@ public class PaimonHashFunction implements
HashFunction<DataChangeEvent>, Serial
private final int parallelism;
- public PaimonHashFunction(
- Options options, TableId tableId, Schema schema, ZoneId zoneId,
int parallelism) {
+ public PaimonHashFunction(Schema schema, ZoneId zoneId, int parallelism) {
this.parallelism = parallelism;
- Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
- FileStoreTable table;
- try {
- table = (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId.toString()));
- } catch (Catalog.TableNotExistException e) {
- throw new RuntimeException(e);
- }
- if (table instanceof AppendOnlyFileStore) {
+ if (schema.primaryKeys().isEmpty()) {
+ // Append-only table: spread events randomly across subtasks.
this.fieldGetters = null;
- channelComputer = null;
+ this.channelComputer = null;
} else {
this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema,
zoneId);
- channelComputer = new RowAssignerChannelComputer(table.schema(),
parallelism);
- channelComputer.setup(parallelism);
+ TableSchema tableSchema = buildTableSchema(schema);
+ this.channelComputer = new RowAssignerChannelComputer(tableSchema,
parallelism);
+ this.channelComputer.setup(parallelism);
}
}
@@ -83,4 +81,22 @@ public class PaimonHashFunction implements
HashFunction<DataChangeEvent>, Serial
return ThreadLocalRandom.current().nextInt(parallelism);
}
}
+
+ private static TableSchema buildTableSchema(Schema schema) {
+ List<Column> columns = schema.getColumns();
+ List<DataField> dataFields = new ArrayList<>(columns.size());
+ for (int i = 0; i < columns.size(); i++) {
+ Column col = columns.get(i);
+ dataFields.add(
+ new DataField(i, col.getName(),
TypeUtils.toPaimonDataType(col.getType())));
+ }
+ return new TableSchema(
+ 0L,
+ dataFields,
+ dataFields.size() - 1,
+ schema.partitionKeys(),
+ schema.primaryKeys(),
+ Collections.emptyMap(),
+ "");
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
index 5f641f409..eeaecb648 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionProvider.java
@@ -23,8 +23,6 @@ import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.paimon.options.Options;
-
import javax.annotation.Nullable;
import java.time.ZoneId;
@@ -32,20 +30,17 @@ import java.time.ZoneId;
/** A {@link HashFunctionProvider} implementation for {@link PaimonDataSink}.
*/
public class PaimonHashFunctionProvider implements
HashFunctionProvider<DataChangeEvent> {
- private final Options options;
-
private final ZoneId zoneId;
private final int parallelism;
- public PaimonHashFunctionProvider(Options options, ZoneId zoneId, int
parallelism) {
- this.options = options;
+ public PaimonHashFunctionProvider(ZoneId zoneId, int parallelism) {
this.zoneId = zoneId;
this.parallelism = parallelism;
}
@Override
public HashFunction<DataChangeEvent> getHashFunction(@Nullable TableId
tableId, Schema schema) {
- return new PaimonHashFunction(options, tableId, schema, zoneId,
parallelism);
+ return new PaimonHashFunction(schema, zoneId, parallelism);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
index d600149d3..cfd09279f 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java
@@ -18,67 +18,29 @@
package org.apache.flink.cdc.connectors.paimon.sink;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
-import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.options.Options;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import java.io.File;
import java.io.IOException;
-import java.nio.file.Path;
import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link PaimonHashFunction}. */
class PaimonHashFunctionTest {
- @TempDir public static Path temporaryFolder;
-
- private Catalog catalog;
-
- private Options catalogOptions;
-
private static final String TEST_DATABASE = "test";
- @BeforeEach
- public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
- catalogOptions = new Options();
- String warehouse =
- new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
- catalogOptions.setString("warehouse", warehouse);
- catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
- catalog.createDatabase(TEST_DATABASE, true);
- }
-
- @AfterEach
- public void afterEach() throws Exception {
- catalog.dropDatabase(TEST_DATABASE, true, true);
- catalog.close();
- }
-
@Test
public void testHashCodeForAppendOnlyTable() throws IOException {
TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
- Map<String, String> tableOptions = new HashMap<>();
- MetadataApplier metadataApplier =
- new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
@@ -86,12 +48,10 @@ class PaimonHashFunctionTest {
.physicalColumn("pt", DataTypes.STRING())
.physicalColumn("variantCol", DataTypes.VARIANT())
.build();
- CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
- metadataApplier.applySchemaChange(createTableEvent);
BinaryRecordDataGenerator generator =
new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
- PaimonHashFunction hashFunction =
- new PaimonHashFunction(catalogOptions, tableId, schema,
ZoneId.systemDefault(), 4);
+ // No primary keys: append-only table. No catalog access required.
+ PaimonHashFunction hashFunction = new PaimonHashFunction(schema,
ZoneId.systemDefault(), 4);
DataChangeEvent dataChangeEvent1 =
DataChangeEvent.insertEvent(
tableId,
@@ -138,10 +98,6 @@ class PaimonHashFunctionTest {
@Test
void testHashCodeForFixedBucketTable() {
TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put("bucket", "10");
- MetadataApplier metadataApplier =
- new PaimonMetadataApplier(catalogOptions, tableOptions, new
HashMap<>());
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING().notNull())
@@ -150,12 +106,10 @@ class PaimonHashFunctionTest {
.primaryKey("col1", "pt")
.partitionKey("pt")
.build();
- CreateTableEvent createTableEvent = new CreateTableEvent(tableId,
schema);
- metadataApplier.applySchemaChange(createTableEvent);
BinaryRecordDataGenerator generator =
new
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
- PaimonHashFunction hashFunction =
- new PaimonHashFunction(catalogOptions, tableId, schema,
ZoneId.systemDefault(), 4);
+ // Primary keys present: table-aware hashing. No catalog access
required.
+ PaimonHashFunction hashFunction = new PaimonHashFunction(schema,
ZoneId.systemDefault(), 4);
DataChangeEvent dataChangeEvent1 =
DataChangeEvent.insertEvent(
tableId,