This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ae986ab3f [FLINK-36609][pipeline-connector][paimon] Add partition 
columns to primary keys
ae986ab3f is described below

commit ae986ab3f607d788b4b82e2aabceb3e807a54139
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Tue Nov 12 20:00:12 2024 +0800

    [FLINK-36609][pipeline-connector][paimon] Add partition columns to primary 
keys
    
    This closes  #3641.
---
 .../paimon/sink/PaimonMetadataApplier.java         | 18 +++++++----
 .../paimon/sink/PaimonMetadataApplierTest.java     | 35 ++++++++++++++++++++--
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
index 5a349946b..928e7b6df 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java
@@ -164,14 +164,22 @@ public class PaimonMetadataApplier implements 
MetadataApplier {
                                             LogicalTypeConversion.toDataType(
                                                     
DataTypeUtils.toFlinkDataType(column.getType())
                                                             
.getLogicalType())));
-            builder.primaryKey(schema.primaryKeys().toArray(new String[0]));
+            List<String> partitionKeys = new ArrayList<>();
+            List<String> primaryKeys = schema.primaryKeys();
             if (partitionMaps.containsKey(event.tableId())) {
-                builder.partitionKeys(partitionMaps.get(event.tableId()));
+                partitionKeys.addAll(partitionMaps.get(event.tableId()));
             } else if (schema.partitionKeys() != null && 
!schema.partitionKeys().isEmpty()) {
-                builder.partitionKeys(schema.partitionKeys());
+                partitionKeys.addAll(schema.partitionKeys());
             }
-            builder.options(tableOptions);
-            builder.options(schema.options());
+            for (String partitionColumn : partitionKeys) {
+                if (!primaryKeys.contains(partitionColumn)) {
+                    primaryKeys.add(partitionColumn);
+                }
+            }
+            builder.partitionKeys(partitionKeys)
+                    .primaryKey(primaryKeys)
+                    .options(tableOptions)
+                    .options(schema.options());
             catalog.createTable(
                     new Identifier(event.tableId().getSchemaName(), 
event.tableId().getTableName()),
                     builder.build(),
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
index 214f2051e..9f3cd806c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java
@@ -177,6 +177,35 @@ public class PaimonMetadataApplierTest {
                                 new DataField(2, "newcol3", 
DataTypes.STRING())));
         Assertions.assertEquals(
                 tableSchema, 
catalog.getTable(Identifier.fromString("test.table1")).rowType());
+
+        // Create table with partition column.
+        createTableEvent =
+                new CreateTableEvent(
+                        TableId.parse("test.table_with_partition"),
+                        org.apache.flink.cdc.common.schema.Schema.newBuilder()
+                                .physicalColumn(
+                                        "col1",
+                                        
org.apache.flink.cdc.common.types.DataTypes.STRING()
+                                                .notNull())
+                                .physicalColumn(
+                                        "col2", 
org.apache.flink.cdc.common.types.DataTypes.INT())
+                                .physicalColumn(
+                                        "dt",
+                                        
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
+                                .primaryKey("col1")
+                                .partitionKey("dt")
+                                .build());
+        metadataApplier.applySchemaChange(createTableEvent);
+        tableSchema =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "col1", 
DataTypes.STRING().notNull()),
+                                new DataField(1, "col2", DataTypes.INT()),
+                                new DataField(2, "dt", 
DataTypes.INT().notNull())));
+        Table tableWithPartition =
+                
catalog.getTable(Identifier.fromString("test.table_with_partition"));
+        Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
+        Assertions.assertEquals(Arrays.asList("col1", "dt"), 
tableWithPartition.primaryKeys());
     }
 
     @ParameterizedTest
@@ -217,10 +246,10 @@ public class PaimonMetadataApplierTest {
                         Arrays.asList(
                                 new DataField(0, "col1", 
DataTypes.STRING().notNull()),
                                 new DataField(1, "col2", DataTypes.STRING()),
-                                new DataField(2, "col3", DataTypes.STRING()),
-                                new DataField(3, "col4", DataTypes.STRING())));
+                                new DataField(2, "col3", 
DataTypes.STRING().notNull()),
+                                new DataField(3, "col4", 
DataTypes.STRING().notNull())));
         Assertions.assertEquals(tableSchema, table.rowType());
-        Assertions.assertEquals(Collections.singletonList("col1"), 
table.primaryKeys());
+        Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), 
table.primaryKeys());
         Assertions.assertEquals(Arrays.asList("col3", "col4"), 
table.partitionKeys());
         Assertions.assertEquals("-1", table.options().get("bucket"));
     }

Reply via email to