This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f951b0a01c [Bug] [Connector-Kudu] Fix Doris auto table from Kudu 
STRING being created as CHAR(16) (#10175)
f951b0a01c is described below

commit f951b0a01c2468d13397db495f8ee2f11c97d632
Author: yzeng1618 <[email protected]>
AuthorDate: Sat Dec 13 21:55:01 2025 +0800

    [Bug] [Connector-Kudu] Fix Doris auto table from Kudu STRING being created 
as CHAR(16) (#10175)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../seatunnel/kudu/catalog/KuduCatalog.java        | 25 +++++--
 .../seatunnel/kudu/catalog/KuduCatalogTest.java    | 78 ++++++++++++++++++++++
 .../e2e/connector/doris/DorisCatalogIT.java        | 49 ++++++++++++++
 3 files changed, 147 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
index 80ee7b7850..1ce7124edf 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
@@ -39,6 +39,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTable;
@@ -142,19 +143,33 @@ public class KuduCatalog implements Catalog {
             kuduTable.getPartitionSchema();
             List<ColumnSchema> columnSchemaList = schema.getColumns();
             Optional<PrimaryKey> primaryKey = 
getPrimaryKey(schema.getPrimaryKeyColumns());
+            PrimaryKey primaryKeyRef = primaryKey.orElse(null);
             buildColumnsWithErrorCheck(
                     tablePath,
                     builder,
                     IntStream.range(0, columnSchemaList.size()).iterator(),
                     i -> {
+                        ColumnSchema columnSchema = columnSchemaList.get(i);
                         SeaTunnelDataType<?> type = 
KuduTypeMapper.mapping(columnSchemaList, i);
+                        Long columnLength = null;
+                        if (Type.STRING.equals(columnSchema.getType())
+                                && PrimaryKey.isPrimaryKeyField(
+                                        primaryKeyRef, 
columnSchema.getName())) {
+                            // Doris does not allow STRING as key column type. 
For primary key
+                            // string columns we provide a reasonable logical 
length
+                            // so that downstream sinks (e.g. Doris) can map 
them to a supported
+                            // CHAR / VARCHAR type instead of the invalid 
STRING type.
+                            columnLength = 256L;
+                        } else if 
(!Type.STRING.equals(columnSchema.getType())) {
+                            columnLength = (long) columnSchema.getTypeSize();
+                        }
                         return PhysicalColumn.of(
-                                columnSchemaList.get(i).getName(),
+                                columnSchema.getName(),
                                 type,
-                                columnSchemaList.get(i).getTypeSize(),
-                                columnSchemaList.get(i).isNullable(),
-                                columnSchemaList.get(i).getDefaultValue(),
-                                columnSchemaList.get(i).getComment());
+                                columnLength,
+                                columnSchema.isNullable(),
+                                columnSchema.getDefaultValue(),
+                                columnSchema.getComment());
                     });
 
             primaryKey.ifPresent(builder::primaryKey);
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogTest.java
 
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogTest.java
new file mode 100644
index 0000000000..04b5a306ba
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+class KuduCatalogTest {
+
+    @Test
+    void testStringColumnLengthShouldBeNull() throws Exception {
+        CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+        KuduCatalog kuduCatalog = new KuduCatalog("kudu", commonConfig);
+
+        KuduClient kuduClient = Mockito.mock(KuduClient.class);
+        Field clientField = KuduCatalog.class.getDeclaredField("kuduClient");
+        clientField.setAccessible(true);
+        clientField.set(kuduCatalog, kuduClient);
+
+        TablePath tablePath = TablePath.of("kudu_string_table");
+        
Mockito.when(kuduClient.tableExists(tablePath.getFullName())).thenReturn(true);
+
+        ColumnSchema idColumn =
+                new ColumnSchema.ColumnSchemaBuilder("id", 
Type.INT32).key(true).build();
+        ColumnSchema stringColumn =
+                new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
+                        .nullable(true)
+                        .build();
+        Schema schema = new Schema(Arrays.asList(idColumn, stringColumn));
+
+        KuduTable kuduTable = Mockito.mock(KuduTable.class);
+        
Mockito.when(kuduClient.openTable(tablePath.getFullName())).thenReturn(kuduTable);
+        Mockito.when(kuduTable.getSchema()).thenReturn(schema);
+        Mockito.when(kuduTable.getPartitionSchema()).thenReturn(null);
+
+        CatalogTable catalogTable = kuduCatalog.getTable(tablePath);
+        Column id = catalogTable.getTableSchema().getColumns().get(0);
+        Column valString = catalogTable.getTableSchema().getColumns().get(1);
+
+        // Non-STRING types should still keep the physical length from Kudu.
+        Assertions.assertEquals("id", id.getName());
+        Assertions.assertNotNull(id.getColumnLength());
+
+        // STRING columns must not use the internal typeSize (commonly 16) as 
logical length.
+        Assertions.assertEquals("val_string", valString.getName());
+        Assertions.assertNull(valString.getColumnLength());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 50db297dbf..f26286e677 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -51,6 +51,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -254,6 +255,54 @@ public class DorisCatalogIT extends AbstractDorisIT {
                 BasicType.DOUBLE_TYPE, 
newTable.getTableSchema().getColumns().get(4).getDataType());
     }
 
+    @Test
+    void testCreateTableWithUnboundedStringColumn() {
+        TableSchema.Builder builder = TableSchema.builder();
+        builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10L, false, 
0, "k1"));
+        // Simulate upstream catalog (such as KuduCatalog) where string column 
has no logical
+        // length, so Doris should create it as STRING instead of CHAR(16).
+        builder.column(
+                PhysicalColumn.of(
+                        "k2",
+                        BasicType.STRING_TYPE,
+                        (Long) null,
+                        false,
+                        null,
+                        "k2 without length"));
+        builder.primaryKey(PrimaryKey.of("pk_k1", 
Collections.singletonList("k1")));
+
+        CatalogTable upstreamTable =
+                CatalogTable.of(
+                        TableIdentifier.of("doris", 
TablePath.of("test.unbounded_string")),
+                        builder.build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        null);
+
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        new HashMap<String, Object>() {
+                            {
+                                put(
+                                        DorisBaseOptions.FENODES.key(),
+                                        container.getHost() + ":" + HTTP_PORT);
+                                put(DorisBaseOptions.DATABASE.key(), "test");
+                                put(DorisBaseOptions.TABLE.key(), 
"unbounded_string");
+                                put(DorisBaseOptions.USERNAME.key(), USERNAME);
+                                put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
+                            }
+                        });
+
+        CatalogTable createdTable =
+                assertCreateTable(upstreamTable, config, 
"test.unbounded_string");
+        Column createdStringColumn = 
createdTable.getTableSchema().getColumns().get(1);
+        Assertions.assertEquals("k2", createdStringColumn.getName());
+        // Ensure that the target column is mapped to Doris STRING type
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
createdStringColumn.getDataType());
+        Assertions.assertEquals(
+                "string", 
createdStringColumn.getSourceType().toLowerCase(Locale.ROOT));
+    }
+
     private CatalogTable assertCreateTable(
             CatalogTable upstreamTable, ReadonlyConfig config, String 
fullName) {
         DorisSinkFactory dorisSinkFactory = new DorisSinkFactory();

Reply via email to