This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f951b0a01c [Bug] [Connector-Kudu] Fix Doris auto table from Kudu
STRING being created as CHAR(16) (#10175)
f951b0a01c is described below
commit f951b0a01c2468d13397db495f8ee2f11c97d632
Author: yzeng1618 <[email protected]>
AuthorDate: Sat Dec 13 21:55:01 2025 +0800
[Bug] [Connector-Kudu] Fix Doris auto table from Kudu STRING being created
as CHAR(16) (#10175)
Co-authored-by: zengyi <[email protected]>
---
.../seatunnel/kudu/catalog/KuduCatalog.java | 25 +++++--
.../seatunnel/kudu/catalog/KuduCatalogTest.java | 78 ++++++++++++++++++++++
.../e2e/connector/doris/DorisCatalogIT.java | 49 ++++++++++++++
3 files changed, 147 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
index 80ee7b7850..1ce7124edf 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java
@@ -39,6 +39,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduTable;
@@ -142,19 +143,33 @@ public class KuduCatalog implements Catalog {
kuduTable.getPartitionSchema();
List<ColumnSchema> columnSchemaList = schema.getColumns();
Optional<PrimaryKey> primaryKey =
getPrimaryKey(schema.getPrimaryKeyColumns());
+ PrimaryKey primaryKeyRef = primaryKey.orElse(null);
buildColumnsWithErrorCheck(
tablePath,
builder,
IntStream.range(0, columnSchemaList.size()).iterator(),
i -> {
+ ColumnSchema columnSchema = columnSchemaList.get(i);
SeaTunnelDataType<?> type =
KuduTypeMapper.mapping(columnSchemaList, i);
+ Long columnLength = null;
+ if (Type.STRING.equals(columnSchema.getType())
+ && PrimaryKey.isPrimaryKeyField(
+ primaryKeyRef,
columnSchema.getName())) {
+ // Doris does not allow STRING as key column type.
For primary key
+ // string columns we provide a reasonable logical
length
+ // so that downstream sinks (e.g. Doris) can map
them to a supported
+ // CHAR / VARCHAR type instead of the invalid
STRING type.
+ columnLength = 256L;
+ } else if
(!Type.STRING.equals(columnSchema.getType())) {
+ columnLength = (long) columnSchema.getTypeSize();
+ }
return PhysicalColumn.of(
- columnSchemaList.get(i).getName(),
+ columnSchema.getName(),
type,
- columnSchemaList.get(i).getTypeSize(),
- columnSchemaList.get(i).isNullable(),
- columnSchemaList.get(i).getDefaultValue(),
- columnSchemaList.get(i).getComment());
+ columnLength,
+ columnSchema.isNullable(),
+ columnSchema.getDefaultValue(),
+ columnSchema.getComment());
});
primaryKey.ifPresent(builder::primaryKey);
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogTest.java
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogTest.java
new file mode 100644
index 0000000000..04b5a306ba
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.catalog;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+class KuduCatalogTest {
+
+ @Test
+ void testStringColumnLengthShouldBeNull() throws Exception {
+ CommonConfig commonConfig = Mockito.mock(CommonConfig.class);
+ KuduCatalog kuduCatalog = new KuduCatalog("kudu", commonConfig);
+
+ KuduClient kuduClient = Mockito.mock(KuduClient.class);
+ Field clientField = KuduCatalog.class.getDeclaredField("kuduClient");
+ clientField.setAccessible(true);
+ clientField.set(kuduCatalog, kuduClient);
+
+ TablePath tablePath = TablePath.of("kudu_string_table");
+
Mockito.when(kuduClient.tableExists(tablePath.getFullName())).thenReturn(true);
+
+ ColumnSchema idColumn =
+ new ColumnSchema.ColumnSchemaBuilder("id",
Type.INT32).key(true).build();
+ ColumnSchema stringColumn =
+ new ColumnSchema.ColumnSchemaBuilder("val_string", Type.STRING)
+ .nullable(true)
+ .build();
+ Schema schema = new Schema(Arrays.asList(idColumn, stringColumn));
+
+ KuduTable kuduTable = Mockito.mock(KuduTable.class);
+
Mockito.when(kuduClient.openTable(tablePath.getFullName())).thenReturn(kuduTable);
+ Mockito.when(kuduTable.getSchema()).thenReturn(schema);
+ Mockito.when(kuduTable.getPartitionSchema()).thenReturn(null);
+
+ CatalogTable catalogTable = kuduCatalog.getTable(tablePath);
+ Column id = catalogTable.getTableSchema().getColumns().get(0);
+ Column valString = catalogTable.getTableSchema().getColumns().get(1);
+
+ // Non-STRING types should still keep the physical length from Kudu.
+ Assertions.assertEquals("id", id.getName());
+ Assertions.assertNotNull(id.getColumnLength());
+
+ // STRING columns must not use the internal typeSize (commonly 16) as
logical length.
+ Assertions.assertEquals("val_string", valString.getName());
+ Assertions.assertNull(valString.getColumnLength());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 50db297dbf..f26286e677 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -51,6 +51,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
@@ -254,6 +255,54 @@ public class DorisCatalogIT extends AbstractDorisIT {
BasicType.DOUBLE_TYPE,
newTable.getTableSchema().getColumns().get(4).getDataType());
}
+ @Test
+ void testCreateTableWithUnboundedStringColumn() {
+ TableSchema.Builder builder = TableSchema.builder();
+ builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10L, false,
0, "k1"));
+ // Simulate upstream catalog (such as KuduCatalog) where string column
has no logical
+ // length, so Doris should create it as STRING instead of CHAR(16).
+ builder.column(
+ PhysicalColumn.of(
+ "k2",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ "k2 without length"));
+ builder.primaryKey(PrimaryKey.of("pk_k1",
Collections.singletonList("k1")));
+
+ CatalogTable upstreamTable =
+ CatalogTable.of(
+ TableIdentifier.of("doris",
TablePath.of("test.unbounded_string")),
+ builder.build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ DorisBaseOptions.FENODES.key(),
+ container.getHost() + ":" + HTTP_PORT);
+ put(DorisBaseOptions.DATABASE.key(), "test");
+ put(DorisBaseOptions.TABLE.key(),
"unbounded_string");
+ put(DorisBaseOptions.USERNAME.key(), USERNAME);
+ put(DorisBaseOptions.PASSWORD.key(), PASSWORD);
+ }
+ });
+
+ CatalogTable createdTable =
+ assertCreateTable(upstreamTable, config,
"test.unbounded_string");
+ Column createdStringColumn =
createdTable.getTableSchema().getColumns().get(1);
+ Assertions.assertEquals("k2", createdStringColumn.getName());
+ // Ensure that the target column is mapped to Doris STRING type
+ Assertions.assertEquals(BasicType.STRING_TYPE,
createdStringColumn.getDataType());
+ Assertions.assertEquals(
+ "string",
createdStringColumn.getSourceType().toLowerCase(Locale.ROOT));
+ }
+
private CatalogTable assertCreateTable(
CatalogTable upstreamTable, ReadonlyConfig config, String
fullName) {
DorisSinkFactory dorisSinkFactory = new DorisSinkFactory();