This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 01be66dfac [Fix][Connector-V2][Clickhouse] Fix ClickHouse catalog 
nullable column type and add tests (#10119)
01be66dfac is described below

commit 01be66dfac8a8053437fd92d5147b148e82b15c1
Author: Jast <[email protected]>
AuthorDate: Mon Dec 8 18:34:00 2025 +0800

    [Fix][Connector-V2][Clickhouse] Fix ClickHouse catalog nullable column type 
and add tests (#10119)
---
 .../clickhouse/util/ClickhouseCatalogUtil.java     | 40 ++++++++++++++++++++++
 .../clickhouse/ClickhouseCreateTableTest.java      | 10 +++---
 .../clickhouse/util/ClickhouseCatalogUtilTest.java | 40 ++++++++++++++++++++++
 .../seatunnel/clickhouse/ClickhouseIT.java         |  2 +-
 4 files changed, 87 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
index 06078a62d1..394e1e81ca 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -21,15 +21,43 @@ import 
org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
 import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseTypeConverter;
 import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
 public class ClickhouseCatalogUtil extends CatalogUtil {
 
+    private static final ThreadLocal<Set<String>> PRIMARY_KEY_COLUMNS =
+            ThreadLocal.withInitial(HashSet::new);
+
     public static final ClickhouseCatalogUtil INSTANCE = new 
ClickhouseCatalogUtil();
 
+    @Override
+    public String getCreateTableSql(
+            String template,
+            String database,
+            String table,
+            TableSchema tableSchema,
+            String comment,
+            String optionsKey) {
+        Set<String> pkColumns = PRIMARY_KEY_COLUMNS.get();
+        pkColumns.clear();
+        if (tableSchema.getPrimaryKey() != null) {
+            pkColumns.addAll(tableSchema.getPrimaryKey().getColumnNames());
+        }
+        try {
+            return super.getCreateTableSql(
+                    template, database, table, tableSchema, comment, 
optionsKey);
+        } finally {
+            pkColumns.clear();
+        }
+    }
+
     public String columnToConnectorType(Column column) {
         checkNotNull(column, "The column is required.");
         String columnType;
@@ -38,6 +66,14 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
         } else {
             columnType = 
ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType();
         }
+
+        Set<String> pkColumns = PRIMARY_KEY_COLUMNS.get();
+        boolean isPrimaryKeyColumn = pkColumns != null && 
pkColumns.contains(column.getName());
+
+        if (column.isNullable() && !isUnsupportedNullableType(columnType) && 
!isPrimaryKeyColumn) {
+            columnType = "Nullable(" + columnType + ")";
+        }
+
         return String.format(
                 "`%s` %s %s",
                 column.getName(),
@@ -49,6 +85,10 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
                                 + "'");
     }
 
+    private static boolean isUnsupportedNullableType(String columnType) {
+        return columnType.startsWith("Map(") || 
columnType.startsWith("Array(");
+    }
+
     public String getDropTableSql(TablePath tablePath, boolean 
ignoreIfNotExists) {
         if (ignoreIfNotExists) {
             return "DROP TABLE IF EXISTS "
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
index 90122ebd64..b2a44e25a1 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
@@ -99,14 +99,16 @@ public class ClickhouseCreateTableTest {
                                 .build(),
                         "clickhouse test table",
                         ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+        // Primary key columns (id, age) should NOT be wrapped in Nullable
+        // because ClickHouse does not allow nullable columns in ORDER BY / 
PRIMARY KEY
         Assertions.assertEquals(
                 createTableSql,
                 "CREATE TABLE IF NOT EXISTS  `test1`.`test2` (\n"
                         + "    `id` Int64 ,`age` Int32 COMMENT 'test 
comment',\n"
-                        + "    `name` String ,\n"
-                        + "`score` Int32 COMMENT '''N''-N',\n"
-                        + "`gender` Int8 ,\n"
-                        + "`create_time` Int64 \n"
+                        + "    `name` Nullable(String) ,\n"
+                        + "`score` Nullable(Int32) COMMENT '\''N''-N',\n"
+                        + "`gender` Nullable(Int8) ,\n"
+                        + "`create_time` Nullable(Int64) \n"
                         + ") ENGINE = MergeTree()\n"
                         + "ORDER BY (`id`,`age`)\n"
                         + "PRIMARY KEY (`id`,`age`)\n"
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
index dd019b3d48..c36b1d652d 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -33,6 +34,8 @@ public class ClickhouseCatalogUtilTest {
         Column column = mock(Column.class);
         when(column.getName()).thenReturn("col1");
         when(column.getSinkType()).thenReturn("String");
+        when(column.isNullable()).thenReturn(false);
+        when(column.getComment()).thenReturn("");
 
         String result = 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
 
@@ -44,6 +47,8 @@ public class ClickhouseCatalogUtilTest {
         Column column = mock(Column.class);
         when(column.getName()).thenReturn("col1");
         when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
+        when(column.isNullable()).thenReturn(false);
+        when(column.getComment()).thenReturn("");
 
         String result = 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
 
@@ -56,9 +61,44 @@ public class ClickhouseCatalogUtilTest {
         when(column.getName()).thenReturn("col1");
         when(column.getDataType()).thenReturn((SeaTunnelDataType) 
BasicType.INT_TYPE);
         when(column.getSinkType()).thenReturn("String");
+        when(column.isNullable()).thenReturn(false);
+        when(column.getComment()).thenReturn("");
 
         String result = 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
 
         assertEquals("`col1` String ", result);
     }
+
+    @Test
+    void wrapsTypeWithNullableWhenColumnIsNullable() {
+        Column column = mock(Column.class);
+        when(column.getName()).thenReturn("col1");
+        when(column.getSinkType()).thenReturn("String");
+        when(column.isNullable()).thenReturn(true);
+        when(column.getComment()).thenReturn("");
+
+        String result = 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
+
+        assertEquals("`col1` Nullable(String) ", result);
+    }
+
+    @Test
+    void escapesSingleQuoteAndBackslashInComment() {
+        Column column = mock(Column.class);
+        when(column.getName()).thenReturn("col1");
+        when(column.getSinkType()).thenReturn("String");
+        when(column.isNullable()).thenReturn(false);
+        when(column.getComment()).thenReturn("O'Reilly \\ path");
+
+        String result = 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
+
+        assertEquals("`col1` String COMMENT 'O''Reilly \\\\ path'", result);
+    }
+
+    @Test
+    void throwsExceptionWhenColumnIsNull() {
+        assertThrows(
+                NullPointerException.class,
+                () -> 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 1e8b40ace0..092c7cd9ad 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -206,7 +206,7 @@ public class ClickhouseIT extends TestSuiteBase implements 
TestResource {
         String tableName = "default.sink_table_for_schema";
         Container.ExecResult execResult =
                 
container.executeJob("/clickhouse_with_recreate_schema_and_custom.conf");
-        Assertions.assertEquals(0, execResult.getExitCode());
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStdout());
         Assertions.assertEquals(101, countData(tableName));
         dropTable(tableName);
     }

Reply via email to