This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new ae986ab3f [FLINK-36609][pipeline-connector][paimon] Add partition columns to primary keys ae986ab3f is described below commit ae986ab3f607d788b4b82e2aabceb3e807a54139 Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Tue Nov 12 20:00:12 2024 +0800 [FLINK-36609][pipeline-connector][paimon] Add partition columns to primary keys This closes #3641. --- .../paimon/sink/PaimonMetadataApplier.java | 18 +++++++---- .../paimon/sink/PaimonMetadataApplierTest.java | 35 ++++++++++++++++++++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java index 5a349946b..928e7b6df 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java @@ -164,14 +164,22 @@ public class PaimonMetadataApplier implements MetadataApplier { LogicalTypeConversion.toDataType( DataTypeUtils.toFlinkDataType(column.getType()) .getLogicalType()))); - builder.primaryKey(schema.primaryKeys().toArray(new String[0])); + List<String> partitionKeys = new ArrayList<>(); + List<String> primaryKeys = schema.primaryKeys(); if (partitionMaps.containsKey(event.tableId())) { - builder.partitionKeys(partitionMaps.get(event.tableId())); + partitionKeys.addAll(partitionMaps.get(event.tableId())); } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) { - builder.partitionKeys(schema.partitionKeys()); + partitionKeys.addAll(schema.partitionKeys()); } - builder.options(tableOptions); - builder.options(schema.options()); + for (String partitionColumn : partitionKeys) { + if (!primaryKeys.contains(partitionColumn)) { + primaryKeys.add(partitionColumn); + } + } + builder.partitionKeys(partitionKeys) + .primaryKey(primaryKeys) + .options(tableOptions) + .options(schema.options()); catalog.createTable( new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()), builder.build(), diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java index 214f2051e..9f3cd806c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java @@ -177,6 +177,35 @@ public class PaimonMetadataApplierTest { new DataField(2, "newcol3", DataTypes.STRING()))); Assertions.assertEquals( tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType()); + + // Create table with partition column. + createTableEvent = + new CreateTableEvent( + TableId.parse("test.table_with_partition"), + org.apache.flink.cdc.common.schema.Schema.newBuilder() + .physicalColumn( + "col1", + org.apache.flink.cdc.common.types.DataTypes.STRING() + .notNull()) + .physicalColumn( + "col2", org.apache.flink.cdc.common.types.DataTypes.INT()) + .physicalColumn( + "dt", + org.apache.flink.cdc.common.types.DataTypes.INT().notNull()) + .primaryKey("col1") + .partitionKey("dt") + .build()); + metadataApplier.applySchemaChange(createTableEvent); + tableSchema = + new RowType( + Arrays.asList( + new DataField(0, "col1", DataTypes.STRING().notNull()), + new DataField(1, "col2", DataTypes.INT()), + new DataField(2, "dt", DataTypes.INT().notNull()))); + Table tableWithPartition = + catalog.getTable(Identifier.fromString("test.table_with_partition")); + Assertions.assertEquals(tableSchema, tableWithPartition.rowType()); + Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys()); } @ParameterizedTest @@ -217,10 +246,10 @@ public class PaimonMetadataApplierTest { Arrays.asList( new DataField(0, "col1", DataTypes.STRING().notNull()), new DataField(1, "col2", DataTypes.STRING()), - new DataField(2, "col3", DataTypes.STRING()), - new DataField(3, "col4", DataTypes.STRING()))); + new DataField(2, "col3", DataTypes.STRING().notNull()), + new DataField(3, "col4", DataTypes.STRING().notNull()))); Assertions.assertEquals(tableSchema, table.rowType()); - Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys()); + Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), table.primaryKeys()); Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys()); Assertions.assertEquals("-1", table.options().get("bucket")); }