This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new aff60f7f1 [FLINK-38275][pipeline-connectors][doris]Fix table creation 
when the upstream has no primary key and the first column is String.
aff60f7f1 is described below

commit aff60f7f1db28cacea1ab6001980d618248aa2a1
Author: wudi <[email protected]>
AuthorDate: Sat Sep 6 23:11:17 2025 +0800

    [FLINK-38275][pipeline-connectors][doris]Fix table creation when the 
upstream has no primary key and the first column is String.
---
 .../cdc/connectors/doris/sink/DorisMetadataApplier.java | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index f5d4adb93..fdf4b5366 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -45,6 +45,7 @@ import org.apache.flink.util.CollectionUtil;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.doris.flink.catalog.DorisTypeMapper;
 import org.apache.doris.flink.catalog.doris.DataModel;
 import org.apache.doris.flink.catalog.doris.FieldSchema;
@@ -149,17 +150,15 @@ public class DorisMetadataApplier implements 
MetadataApplier {
             TableSchema tableSchema = new TableSchema();
             tableSchema.setTable(tableId.getTableName());
             tableSchema.setDatabase(tableId.getSchemaName());
+            tableSchema.setModel(
+                    CollectionUtils.isEmpty(schema.primaryKeys())
+                            ? DataModel.DUPLICATE
+                            : DataModel.UNIQUE);
             tableSchema.setFields(buildFields(schema));
+            tableSchema.setKeys(buildKeys(schema));
             tableSchema.setDistributeKeys(buildDistributeKeys(schema));
             tableSchema.setTableComment(schema.comment());
 
-            if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
-                tableSchema.setModel(DataModel.DUPLICATE);
-            } else {
-                tableSchema.setKeys(schema.primaryKeys());
-                tableSchema.setModel(DataModel.UNIQUE);
-            }
-
             Map<String, String> tableProperties =
                     DorisDataSinkOptions.getPropertiesByPrefix(
                             config, TABLE_CREATE_PROPERTIES_PREFIX);
@@ -207,6 +206,10 @@ public class DorisMetadataApplier implements 
MetadataApplier {
         return fieldSchemaMap;
     }
 
+    private List<String> buildKeys(Schema schema) {
+        return buildDistributeKeys(schema);
+    }
+
     private List<String> buildDistributeKeys(Schema schema) {
         if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
             return schema.primaryKeys();

Reply via email to