This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new aff60f7f1 [FLINK-38275][pipeline-connectors][doris]Fix table creation
when the upstream has no primary key and the first column is String.
aff60f7f1 is described below
commit aff60f7f1db28cacea1ab6001980d618248aa2a1
Author: wudi <[email protected]>
AuthorDate: Sat Sep 6 23:11:17 2025 +0800
[FLINK-38275][pipeline-connectors][doris]Fix table creation when the
upstream has no primary key and the first column is String.
---
.../cdc/connectors/doris/sink/DorisMetadataApplier.java | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
index f5d4adb93..fdf4b5366 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java
@@ -45,6 +45,7 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.flink.catalog.DorisTypeMapper;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.FieldSchema;
@@ -149,17 +150,15 @@ public class DorisMetadataApplier implements
MetadataApplier {
TableSchema tableSchema = new TableSchema();
tableSchema.setTable(tableId.getTableName());
tableSchema.setDatabase(tableId.getSchemaName());
+ tableSchema.setModel(
+ CollectionUtils.isEmpty(schema.primaryKeys())
+ ? DataModel.DUPLICATE
+ : DataModel.UNIQUE);
tableSchema.setFields(buildFields(schema));
+ tableSchema.setKeys(buildKeys(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
tableSchema.setTableComment(schema.comment());
- if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
- tableSchema.setModel(DataModel.DUPLICATE);
- } else {
- tableSchema.setKeys(schema.primaryKeys());
- tableSchema.setModel(DataModel.UNIQUE);
- }
-
Map<String, String> tableProperties =
DorisDataSinkOptions.getPropertiesByPrefix(
config, TABLE_CREATE_PROPERTIES_PREFIX);
@@ -207,6 +206,10 @@ public class DorisMetadataApplier implements
MetadataApplier {
return fieldSchemaMap;
}
+ private List<String> buildKeys(Schema schema) {
+ return buildDistributeKeys(schema);
+ }
+
private List<String> buildDistributeKeys(Schema schema) {
if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
return schema.primaryKeys();