This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e9990afd5 [Improve][Connector-Clickhouse] improve ck batch parallel 
read by using last batch row sorting value approach, instead of limit offset. 
(#9801)
5e9990afd5 is described below

commit 5e9990afd55c4e78a35b2cb122b5c7c1f6bbc386
Author: JeremyXin <[email protected]>
AuthorDate: Mon Sep 8 18:34:08 2025 +0800

    [Improve][Connector-Clickhouse] improve ck batch parallel read by using 
last batch row sorting value approach, instead of limit offset. (#9801)
---
 .../clickhouse/source/ClickhousePart.java          |  20 +-
 .../clickhouse/source/ClickhouseValueReader.java   | 263 ++++++++++++++++++---
 .../source/split/ClickhouseSourceSplit.java        |   4 -
 .../source/split/PartStrategySplitter.java         |   1 -
 .../source/split/SqlStrategySplitter.java          |   2 -
 .../source/ClickhouseValueReaderTest.java          | 147 ++++++++----
 6 files changed, 347 insertions(+), 90 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
index 1c51c0baf2..c0dbba5714 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
 
 public class ClickhousePart implements Serializable, 
Comparable<ClickhousePart> {
@@ -31,7 +32,12 @@ public class ClickhousePart implements Serializable, 
Comparable<ClickhousePart>
     private final String database;
     private final String table;
     private final Shard shard;
-    private int offset = 0;
+
+    /**
+     * Stores the last ordering key values fetched for Keyset cursor 
pagination. The order matches
+     * the table's sorting key columns.
+     */
+    private List<Object> lastOrderingKeyValues;
 
     /** Flag indicating whether all data from this part has been completely 
read. */
     private boolean isEndOfPart = false;
@@ -67,12 +73,12 @@ public class ClickhousePart implements Serializable, 
Comparable<ClickhousePart>
         this.isEndOfPart = endOfPart;
     }
 
-    public void setOffset(int offset) {
-        this.offset = offset;
+    public List<Object> getLastOrderingKeyValues() {
+        return lastOrderingKeyValues;
     }
 
-    public int getOffset() {
-        return offset;
+    public void setLastOrderingKeyValues(List<Object> lastOrderingKeyValues) {
+        this.lastOrderingKeyValues = lastOrderingKeyValues;
     }
 
     @Override
@@ -114,10 +120,10 @@ public class ClickhousePart implements Serializable, 
Comparable<ClickhousePart>
                 + '\''
                 + ", shard="
                 + shard
-                + ", offset="
-                + offset
                 + ", isEndOfPart="
                 + isEndOfPart
+                + ", lastOrderingKeyValues="
+                + lastOrderingKeyValues
                 + '}';
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
index 77da3a77dc..1a8f6c27ec 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
@@ -18,10 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
@@ -33,28 +35,51 @@ import com.clickhouse.client.ClickHouseResponse;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+/**
+ * ClickhouseValueReader is responsible for reading data from ClickHouse 
database. It supports two
+ * reading modes determined by {@link #shouldUseStreamReader()}:
+ *
+ * <p>1. Stream Mode: Used when the query is complex, no sorting key exists, 
or not all sorting key
+ * columns are included in the query fields.
+ *
+ * <p>2. Batch Mode: Used keyset pagination approach by tracking the last 
row's sorting key values
+ * from each batch. This mode requires {@link #isAllSortKeyInRowType()} to be 
true, meaning all
+ * sorting key columns must be included in the query fields.
+ */
 @Slf4j
 public class ClickhouseValueReader implements Serializable {
     private static final long serialVersionUID = 4588012013447713463L;
 
+    private static final DateTimeFormatter TS_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
     private final ClickhouseSourceSplit clickhouseSourceSplit;
     private final SeaTunnelRowType rowTypeInfo;
     private final ClickhouseSourceTable clickhouseSourceTable;
     private StreamValueReader streamValueReader;
     private ClickhouseProxy proxy;
+    private final boolean shouldUseStreamReader;
 
     protected int currentPartIndex = 0;
 
     private List<SeaTunnelRow> rowBatch;
 
+    // SQL strategy keyset order values
+    private List<Object> sqlLastOrderingKeyValues;
+
     public ClickhouseValueReader(
             ClickhouseSourceSplit clickhouseSourceSplit,
             SeaTunnelRowType seaTunnelRowType,
@@ -63,10 +88,11 @@ public class ClickhouseValueReader implements Serializable {
         this.rowTypeInfo = seaTunnelRowType;
         this.clickhouseSourceTable = clickhouseSourceTable;
         this.proxy = new 
ClickhouseProxy(clickhouseSourceSplit.getShard().getNode());
+        this.shouldUseStreamReader = shouldUseStreamReader();
     }
 
     public boolean hasNext() {
-        if (shouldUseStreamReader()) {
+        if (shouldUseStreamReader) {
             if (streamValueReader == null) {
                 streamValueReader = new StreamValueReader();
             }
@@ -104,7 +130,7 @@ public class ClickhouseValueReader implements Serializable {
         }
 
         try {
-            String query = buildPartQuery(currentPart);
+            String query = buildBatchPartQuery(currentPart);
             rowBatch =
                     proxy.batchFetchRecords(
                             query, clickhouseSourceTable.getTablePath(), 
rowTypeInfo);
@@ -121,8 +147,15 @@ public class ClickhouseValueReader implements Serializable 
{
                 return currentPartIndex < partSize && partBatchStrategyRead();
             }
 
-            // update part offset
-            currentPart.setOffset(currentPart.getOffset() + rowBatch.size());
+            // update Keyset cursor (last ordering key values)
+            String sortingKey = 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+            SeaTunnelRow lastRow = rowBatch.get(rowBatch.size() - 1);
+            List<Object> keyValues = extractOrderingKeyValuesFromRow(lastRow, 
sortingKey);
+            log.debug("lastRow: {}, extract ordering key values from row: {}", 
lastRow, keyValues);
+
+            currentPart.setLastOrderingKeyValues(keyValues);
+
             return true;
         } catch (Exception e) {
             throw new ClickhouseConnectorException(
@@ -138,15 +171,26 @@ public class ClickhouseValueReader implements 
Serializable {
     }
 
     private boolean sqlBatchStrategyRead() {
-        String query = buildSqlQuery();
+        String query = buildBatchSqlQuery();
 
         try {
             rowBatch =
                     proxy.batchFetchRecords(
                             query, clickhouseSourceTable.getTablePath(), 
rowTypeInfo);
 
-            clickhouseSourceSplit.setSqlOffset(
-                    clickhouseSourceSplit.getSqlOffset() + rowBatch.size());
+            String sortingKey = 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+            if (rowBatch.isEmpty()) {
+                return false;
+            }
+            SeaTunnelRow lastRow = rowBatch.get(rowBatch.size() - 1);
+
+            sqlLastOrderingKeyValues = 
extractOrderingKeyValuesFromRow(lastRow, sortingKey);
+
+            log.debug(
+                    "lastRow: {}, extract ordering key values from row: {}",
+                    lastRow,
+                    sqlLastOrderingKeyValues);
 
             return !rowBatch.isEmpty();
         } catch (Exception e) {
@@ -173,10 +217,30 @@ public class ClickhouseValueReader implements 
Serializable {
 
     private boolean shouldUseStreamReader() {
         return clickhouseSourceTable.isComplexSql()
-                || 
StringUtils.isEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey());
+                || 
StringUtils.isEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey())
+                || !isAllSortKeyInRowType();
     }
 
-    private String buildPartQuery(ClickhousePart part) {
+    /** Verify if all sorting key exists in roTypeInfo */
+    private boolean isAllSortKeyInRowType() {
+        ClickhouseTable clickhouseTable = 
clickhouseSourceTable.getClickhouseTable();
+        if (clickhouseTable == null || 
StringUtils.isEmpty(clickhouseTable.getSortingKey())) {
+            return false;
+        }
+        String sortingKey = clickhouseTable.getSortingKey();
+        List<String> sortingKeyList =
+                
Arrays.stream(sortingKey.split(",")).map(String::trim).collect(Collectors.toList());
+
+        // check all sort key exists in rowTypeInfo
+        Optional<String> sortKeyNotExistOpt =
+                sortingKeyList.stream()
+                        .filter(key -> rowTypeInfo.indexOf(key, false) == -1)
+                        .findAny();
+
+        return !sortKeyNotExistOpt.isPresent();
+    }
+
+    private String buildBatchPartQuery(ClickhousePart part) {
         TablePath tablePath = TablePath.of(part.getDatabase(), 
part.getTable());
 
         String whereClause = String.format("_part = '%s'", part.getName());
@@ -184,55 +248,170 @@ public class ClickhouseValueReader implements 
Serializable {
             whereClause += " AND (" + clickhouseSourceTable.getFilterQuery() + 
")";
         }
 
-        String orderByClause = "";
-        if 
(StringUtils.isNotEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey()))
 {
-            orderByClause =
-                    " ORDER BY " + 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+        String sortingKey = 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+        String orderByClause = " ORDER BY " + sortingKey;
+
+        String keysetWhere = "";
+        // Key cursor mode pagination: when sorting key exists, use tuple 
comparison on
+        // lastOrderingKeyValues
+        if (part.getLastOrderingKeyValues() != null) {
+            keysetWhere = buildKeysetWhereCondition(sortingKey, 
part.getLastOrderingKeyValues());
+            if (!keysetWhere.isEmpty()) {
+                whereClause += " AND (" + keysetWhere + ")";
+            }
         }
 
         String sql;
-        if (StringUtils.isNotEmpty(orderByClause)) {
+
+        if (part.getLastOrderingKeyValues() != null) {
+            // key cursor mode: no OFFSET, only LIMIT
             sql =
                     String.format(
-                            "SELECT * FROM %s.%s WHERE %s %s LIMIT %d, %d WITH 
TIES",
+                            "SELECT * FROM %s.%s WHERE %s %s LIMIT %d WITH 
TIES",
                             tablePath.getDatabaseName(),
                             tablePath.getTableName(),
                             whereClause,
                             orderByClause,
-                            part.getOffset(),
                             clickhouseSourceTable.getBatchSize());
         } else {
+            // for the first sql creation, lastOrderingKeyValues is null
             sql =
                     String.format(
-                            "SELECT * FROM %s.%s WHERE %s",
-                            tablePath.getDatabaseName(), 
tablePath.getTableName(), whereClause);
+                            "SELECT * FROM %s.%s WHERE %s %s LIMIT %d, %d WITH 
TIES",
+                            tablePath.getDatabaseName(),
+                            tablePath.getTableName(),
+                            whereClause,
+                            orderByClause,
+                            0,
+                            clickhouseSourceTable.getBatchSize());
         }
 
+        log.info("generate batch part sql: {}", sql);
+
         return sql;
     }
 
-    private String buildSqlQuery() {
-        String orderByClause = "";
-        if 
(StringUtils.isNotEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey()))
 {
-            orderByClause =
-                    " ORDER BY " + 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+    private String buildBatchSqlQuery() {
+        String base =
+                String.format("SELECT * FROM (%s) AS t", 
clickhouseSourceSplit.getSplitQuery());
+
+        String sortingKey = 
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+        String whereClause = "";
+        if (sqlLastOrderingKeyValues != null) {
+            String keyset = buildKeysetWhereCondition(sortingKey, 
sqlLastOrderingKeyValues);
+            if (!keyset.isEmpty()) {
+                whereClause = " WHERE (" + keyset + ")";
+            }
         }
 
-        String executeSql;
-        if (StringUtils.isNotEmpty(orderByClause)) {
-            executeSql =
+        String orderByClause = " ORDER BY " + sortingKey;
+
+        String sql;
+        if (sqlLastOrderingKeyValues != null) {
+            // key cursor mode: no OFFSET, only LIMIT
+            sql =
                     String.format(
-                            "SELECT * FROM (%s) AS t %s LIMIT %d, %d WITH 
TIES",
-                            clickhouseSourceSplit.getSplitQuery(),
-                            orderByClause,
-                            clickhouseSourceSplit.getSqlOffset(),
-                            clickhouseSourceTable.getBatchSize());
+                            "%s %s %s LIMIT %d WITH TIES",
+                            base, whereClause, orderByClause, 
clickhouseSourceTable.getBatchSize());
         } else {
-            executeSql =
-                    String.format("SELECT * FROM (%s) AS t", 
clickhouseSourceSplit.getSplitQuery());
+            // for the first sql creation, sqlLastOrderingKeyValues is null
+            sql =
+                    String.format(
+                            "%s %s LIMIT %d, %d WITH TIES",
+                            base, orderByClause, 0, 
clickhouseSourceTable.getBatchSize());
+        }
+
+        log.info("generate batch query sql: {}", sql);
+
+        return sql;
+    }
+
+    /**
+     * Build WHERE condition using the sorting key and last key values. 
Supports single or composite
+     * keys, and generates lexicographic tuple comparison.
+     */
+    private String buildKeysetWhereCondition(String sortingKey, List<Object> 
lastKeyValues) {
+        List<String> keyCols =
+                
Arrays.stream(sortingKey.split(",")).map(String::trim).collect(Collectors.toList());
+        if (lastKeyValues == null
+                || lastKeyValues.isEmpty()
+                || keyCols.size() != lastKeyValues.size()) {
+            return "";
         }
 
-        return executeSql;
+        // Build tuple comparison (c1, c2, ...) > (v1, v2, ...)
+        String left = "(" + String.join(", ", keyCols) + ")";
+
+        // Convert lastKeyValues to SQL literals based on rowTypeInfo
+        String inlinedRight = "(" + buildSqlLiteralsForKeyValues(keyCols, 
lastKeyValues) + ")";
+
+        return left + " > " + inlinedRight;
+    }
+
+    private String buildSqlLiteralsForKeyValues(List<String> keyCols, 
List<Object> values) {
+        List<String> literals = new ArrayList<>();
+        for (int i = 0; i < keyCols.size(); i++) {
+            String col = keyCols.get(i);
+            Object v = values.get(i);
+            literals.add(toSqlLiteral(col, v));
+        }
+        return String.join(", ", literals);
+    }
+
+    private String toSqlLiteral(String column, Object value) {
+        if (value == null) {
+            return "NULL";
+        }
+        int idx = rowTypeInfo.indexOf(column, false);
+        if (idx < 0) {
+            // fallback: quote as string
+            return quoteString(value.toString());
+        }
+        SeaTunnelDataType<?> t = rowTypeInfo.getFieldType(idx);
+        switch (t.getSqlType()) {
+            case STRING:
+                return quoteString(value.toString());
+            case BOOLEAN:
+                return Boolean.TRUE.equals(value) ? "1" : "0";
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+                return value.toString();
+            case DATE:
+                if (value instanceof LocalDate) {
+                    return quoteString(value.toString());
+                }
+                return quoteString(String.valueOf(value));
+            case TIMESTAMP:
+                if (value instanceof LocalDateTime) {
+                    return quoteString(TS_FORMATTER.format((LocalDateTime) 
value));
+                }
+                return quoteString(String.valueOf(value));
+            default:
+                return quoteString(String.valueOf(value));
+        }
+    }
+
+    private List<Object> extractOrderingKeyValuesFromRow(SeaTunnelRow row, 
String sortingKey) {
+        List<String> keyCols =
+                
Arrays.stream(sortingKey.split(",")).map(String::trim).collect(Collectors.toList());
+        List<Object> keyValues = new ArrayList<>(keyCols.size());
+        for (String col : keyCols) {
+            int idx = rowTypeInfo.indexOf(col, false);
+            keyValues.add(row.getField(idx));
+        }
+        return keyValues;
+    }
+
+    private String quoteString(String s) {
+        String escaped = s.replace("\\", "\\\\").replace("'", "''");
+        return "'" + escaped + "'";
     }
 
     private class StreamValueReader implements Serializable {
@@ -259,6 +438,7 @@ public class ClickhouseValueReader implements Serializable {
                                 try {
                                     for (String sql : sqlList) {
                                         executeSql = sql;
+                                        log.info("execute stream sql: {}", 
executeSql);
                                         try (ClickHouseResponse response =
                                                 proxy.getClickhouseConnection()
                                                         .query(sql)
@@ -335,11 +515,24 @@ public class ClickhouseValueReader implements 
Serializable {
                 return 
Collections.singletonList(clickhouseSourceSplit.getSplitQuery());
             } else {
                 return clickhouseSourceSplit.getParts().stream()
-                        .map(ClickhouseValueReader.this::buildPartQuery)
+                        .map(this::buildStreamPartQuery)
                         .collect(Collectors.toList());
             }
         }
 
+        private String buildStreamPartQuery(ClickhousePart part) {
+            TablePath tablePath = TablePath.of(part.getDatabase(), 
part.getTable());
+
+            String whereClause = String.format("_part = '%s'", part.getName());
+            if 
(StringUtils.isNotEmpty(clickhouseSourceTable.getFilterQuery())) {
+                whereClause += " AND (" + 
clickhouseSourceTable.getFilterQuery() + ")";
+            }
+
+            return String.format(
+                    "SELECT * FROM %s.%s WHERE %s",
+                    tablePath.getDatabaseName(), tablePath.getTableName(), 
whereClause);
+        }
+
         public void close() {
             if (rowQueue != null) {
                 rowQueue.clear();
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
index 445c5bb4c7..62ad3f15ce 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
@@ -24,7 +24,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhousePar
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.Setter;
 
 import java.util.List;
 
@@ -39,7 +38,6 @@ public class ClickhouseSourceSplit implements SourceSplit {
     private final List<ClickhousePart> parts;
     private final Shard shard;
     private final String splitQuery;
-    @Setter private int sqlOffset;
 
     private final String splitId;
 
@@ -66,8 +64,6 @@ public class ClickhouseSourceSplit implements SourceSplit {
                 + ", splitQuery='"
                 + splitQuery
                 + "'"
-                + ", sqlOffset="
-                + sqlOffset
                 + ", splitId='"
                 + splitId
                 + "'"
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
index 5c5a4bd0ed..ca36fe6ae7 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
@@ -118,7 +118,6 @@ public class PartStrategySplitter implements Splitter, 
AutoCloseable, Serializab
                                 new ArrayList<>(partSplit),
                                 shardPartsEntry.getKey(),
                                 clickhouseSourceTable.getOriginQuery(),
-                                0,
                                 splitId);
                 splits.add(clickhouseSourceSplit);
             }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
index b9242fcd42..07b9fa8c77 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
@@ -64,7 +64,6 @@ public class SqlStrategySplitter implements Splitter, 
AutoCloseable, Serializabl
                                         new ArrayList<>(),
                                         shard,
                                         querySql,
-                                        0,
                                         createSplitId(
                                                 
clickhouseSourceTable.getTablePath(),
                                                 shard,
@@ -101,7 +100,6 @@ public class SqlStrategySplitter implements Splitter, 
AutoCloseable, Serializabl
                         new ArrayList<>(),
                         clusterShardList.get(0),
                         clickhouseSourceTable.getOriginQuery(),
-                        0,
                         createSplitId(
                                 clickhouseSourceTable.getTablePath(), 
clusterShardList.get(0), 0)));
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
index 57571b1dc7..22d433e459 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
@@ -46,9 +47,12 @@ import com.clickhouse.client.data.ClickHouseStringValue;
 import lombok.extern.slf4j.Slf4j;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -100,7 +104,6 @@ public class ClickhouseValueReaderTest {
                         new ArrayList<>(parts),
                         shard,
                         "",
-                        0,
                         "split-1");
 
         mockProxy = Mockito.mock(ClickhouseProxy.class, 
Mockito.RETURNS_DEEP_STUBS);
@@ -108,13 +111,8 @@ public class ClickhouseValueReaderTest {
         initStreamValueReaderMock();
 
         reader = new ClickhouseValueReader(split, rowType, sourceTable);
-        try {
-            Field proxyField = 
ClickhouseValueReader.class.getDeclaredField("proxy");
-            proxyField.setAccessible(true);
-            proxyField.set(reader, mockProxy);
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to set mock proxy", e);
-        }
+
+        ReflectionUtils.setField(reader, ClickhouseValueReader.class, "proxy", 
mockProxy);
     }
 
     @Test
@@ -130,9 +128,11 @@ public class ClickhouseValueReaderTest {
         Assertions.assertEquals(BATCH_SIZE, result.size());
         Assertions.assertEquals(0, reader.currentPartIndex);
 
-        // Make sure the offset has been updated but the part has not been 
marked as end of part
+        // In keyset mode, lastOrderingKeyValues should be updated, offset 
remains 0
         List<ClickhousePart> parts = new ArrayList<>(split.getParts());
-        Assertions.assertEquals(BATCH_SIZE, parts.get(0).getOffset());
+        Assertions.assertNotNull(parts.get(0).getLastOrderingKeyValues());
+        Assertions.assertEquals(
+                (long) (BATCH_SIZE - 1), 
parts.get(0).getLastOrderingKeyValues().get(0));
         Assertions.assertFalse(parts.get(0).isEndOfPart());
     }
 
@@ -150,9 +150,11 @@ public class ClickhouseValueReaderTest {
         List<SeaTunnelRow> result = reader.next();
         Assertions.assertEquals(partialSize, result.size());
 
-        // Make sure the offset has been updated
+        // In keyset mode, lastOrderingKeyValues should be updated to last row 
id, and no EOS
         List<ClickhousePart> parts = new ArrayList<>(split.getParts());
-        Assertions.assertEquals(partialSize, parts.get(0).getOffset());
+        Assertions.assertNotNull(parts.get(0).getLastOrderingKeyValues());
+        Assertions.assertEquals(
+                (long) (partialSize - 1), 
parts.get(0).getLastOrderingKeyValues().get(0));
 
         Assertions.assertTrue(reader.hasNext());
     }
@@ -193,9 +195,13 @@ public class ClickhouseValueReaderTest {
                         invocation -> {
                             ClickhousePart part = 
parts.get(reader.currentPartIndex);
                             if ("part1".equals(part.getName())) {
-                                return part.getOffset() == 0 ? mockRows1 : new 
ArrayList<>();
+                                return part.getLastOrderingKeyValues() == null
+                                        ? mockRows1
+                                        : new ArrayList<>();
                             } else {
-                                return part.getOffset() == 0 ? mockRows2 : new 
ArrayList<>();
+                                return part.getLastOrderingKeyValues() == null
+                                        ? mockRows2
+                                        : new ArrayList<>();
                             }
                         });
 
@@ -220,7 +226,8 @@ public class ClickhouseValueReaderTest {
 
     @Test
     public void testPartStrategyReadWithNoSortingKey() {
-        when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("");
+        ReflectionUtils.setField(
+                reader, ClickhouseValueReader.class, "shouldUseStreamReader", 
true);
 
         Assertions.assertTrue(reader.hasNext());
         List<SeaTunnelRow> result = reader.next();
@@ -235,16 +242,10 @@ public class ClickhouseValueReaderTest {
 
     @Test
     public void testSqlStrategyReadWithNoSortingKey() {
-        try {
-            Field sqlStrategyField =
-                    
ClickhouseSourceTable.class.getDeclaredField("isSqlStrategyRead");
-            sqlStrategyField.setAccessible(true);
-            sqlStrategyField.set(sourceTable, true);
-        } catch (Exception e) {
-            Assertions.fail("Failed to set isSqlStrategyRead field", e);
-        }
-
-        when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("");
+        ReflectionUtils.setField(
+                sourceTable, ClickhouseSourceTable.class, "isSqlStrategyRead", 
true);
+        ReflectionUtils.setField(
+                reader, ClickhouseValueReader.class, "shouldUseStreamReader", 
true);
 
         Assertions.assertTrue(reader.hasNext());
 
@@ -256,29 +257,23 @@ public class ClickhouseValueReaderTest {
 
     @Test
     public void testSqlStrategyReadWithSortingKey() {
-        try {
-            Field sqlStrategyField =
-                    
ClickhouseSourceTable.class.getDeclaredField("isSqlStrategyRead");
-            sqlStrategyField.setAccessible(true);
-            sqlStrategyField.set(sourceTable, true);
-        } catch (Exception e) {
-            Assertions.fail("Failed to set isSqlStrategyRead field", e);
-        }
+        ReflectionUtils.setField(
+                sourceTable, ClickhouseSourceTable.class, "isSqlStrategyRead", 
true);
 
         
when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("id");
 
+        // In Keyset mode, we expect multiple batches without relying on 
sqlOffset
         List<SeaTunnelRow> firstBatch = createMockRows(BATCH_SIZE);
         List<SeaTunnelRow> secondBatch = createMockRows(5);
         List<SeaTunnelRow> emptyBatch = new ArrayList<>();
 
-        when(mockProxy.batchFetchRecords(any(), 
eq(sourceTable.getTablePath()), eq(rowType)))
-                .thenAnswer(
-                        x ->
-                                split.getSqlOffset() == 0
-                                        ? firstBatch
-                                        : split.getSqlOffset() == BATCH_SIZE
-                                                ? secondBatch
-                                                : emptyBatch);
+        // Simulate: first call returns firstBatch, second call returns 
secondBatch, then empty
+        Mockito.when(
+                        mockProxy.batchFetchRecords(
+                                any(), eq(sourceTable.getTablePath()), 
eq(rowType)))
+                .thenReturn(firstBatch)
+                .thenReturn(secondBatch)
+                .thenReturn(emptyBatch);
 
         Assertions.assertTrue(reader.hasNext());
         List<SeaTunnelRow> result1 = reader.next();
@@ -319,6 +314,76 @@ public class ClickhouseValueReaderTest {
         }
     }
 
+    @Test
+    public void testBuildKeysetWhereCondition() throws Exception {
+        Optional<Method> methodOpt =
+                ReflectionUtils.getDeclaredMethod(
+                        ClickhouseValueReader.class,
+                        "buildKeysetWhereCondition",
+                        String.class,
+                        List.class);
+        Assertions.assertTrue(methodOpt.isPresent());
+
+        Method buildKeysetWhereConditionMethod = methodOpt.get();
+
+        // Test a single sort key
+        String sortingKey = "id";
+        List<Object> keyValues = Collections.singletonList(100L);
+        Object result = buildKeysetWhereConditionMethod.invoke(reader, 
sortingKey, keyValues);
+        Assertions.assertEquals("(id) > (100)", result);
+
+        // Test the composite sort key
+        sortingKey = "id, name";
+        keyValues = Arrays.asList(100L, "test");
+        result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey, 
keyValues);
+        Assertions.assertEquals("(id, name) > (100, 'test')", result);
+
+        // Test values containing special characters
+        sortingKey = "id, name";
+        keyValues = Arrays.asList(100L, "test'with quote");
+        result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey, 
keyValues);
+        Assertions.assertEquals("(id, name) > (100, 'test''with quote')", 
result);
+
+        // Test the list of null key values
+        result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey, 
null);
+        Assertions.assertEquals("", result);
+
+        result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey, 
new ArrayList<>());
+        Assertions.assertEquals("", result);
+
+        // The number of test keys and values does not match
+        sortingKey = "id, name, age";
+        keyValues = Arrays.asList(100L, "test");
+        result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey, 
keyValues);
+        Assertions.assertEquals("", result);
+    }
+
+    @Test
+    public void testIsAllSortKeyInRowType() throws Exception {
+        Optional<Method> methodOpt =
+                ReflectionUtils.getDeclaredMethod(
+                        ClickhouseValueReader.class, "isAllSortKeyInRowType");
+        Assertions.assertTrue(methodOpt.isPresent());
+
+        Method isAllSortKeyInRowTypeMethod = methodOpt.get();
+
+        // Test case 1: Valid composite sorting key
+        when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("id, 
age");
+        boolean result = (boolean) isAllSortKeyInRowTypeMethod.invoke(reader);
+        Assertions.assertTrue(result);
+
+        // Test case 2: Empty sorting key
+        when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("");
+        result = (boolean) isAllSortKeyInRowTypeMethod.invoke(reader);
+        Assertions.assertFalse(result);
+
+        // Test case 3: row type not contains all sort key
+        when(sourceTable.getClickhouseTable().getSortingKey())
+                .thenReturn("id, name, age, non_existent_field");
+        result = (boolean) isAllSortKeyInRowTypeMethod.invoke(reader);
+        Assertions.assertFalse(result);
+    }
+
     private void initStreamValueReaderMock() throws ClickHouseException {
         mockClickhouseQueryAndResponse(mockProxy, null, 
createMockClickHouseRecords());
     }


Reply via email to