This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e9990afd5 [Improve][Connector-Clickhouse] improve ck batch parallel
read by using last batch row sorting value approach, instead of limit offset.
(#9801)
5e9990afd5 is described below
commit 5e9990afd55c4e78a35b2cb122b5c7c1f6bbc386
Author: JeremyXin <[email protected]>
AuthorDate: Mon Sep 8 18:34:08 2025 +0800
[Improve][Connector-Clickhouse] improve ck batch parallel read by using
last batch row sorting value approach, instead of limit offset. (#9801)
---
.../clickhouse/source/ClickhousePart.java | 20 +-
.../clickhouse/source/ClickhouseValueReader.java | 263 ++++++++++++++++++---
.../source/split/ClickhouseSourceSplit.java | 4 -
.../source/split/PartStrategySplitter.java | 1 -
.../source/split/SqlStrategySplitter.java | 2 -
.../source/ClickhouseValueReaderTest.java | 147 ++++++++----
6 files changed, 347 insertions(+), 90 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
index 1c51c0baf2..c0dbba5714 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhousePart.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import java.io.Serializable;
+import java.util.List;
import java.util.Objects;
public class ClickhousePart implements Serializable,
Comparable<ClickhousePart> {
@@ -31,7 +32,12 @@ public class ClickhousePart implements Serializable,
Comparable<ClickhousePart>
private final String database;
private final String table;
private final Shard shard;
- private int offset = 0;
+
+ /**
+ * Stores the last ordering key values fetched for Keyset cursor
pagination. The order matches
+ * the table's sorting key columns.
+ */
+ private List<Object> lastOrderingKeyValues;
/** Flag indicating whether all data from this part has been completely
read. */
private boolean isEndOfPart = false;
@@ -67,12 +73,12 @@ public class ClickhousePart implements Serializable,
Comparable<ClickhousePart>
this.isEndOfPart = endOfPart;
}
- public void setOffset(int offset) {
- this.offset = offset;
+ public List<Object> getLastOrderingKeyValues() {
+ return lastOrderingKeyValues;
}
- public int getOffset() {
- return offset;
+ public void setLastOrderingKeyValues(List<Object> lastOrderingKeyValues) {
+ this.lastOrderingKeyValues = lastOrderingKeyValues;
}
@Override
@@ -114,10 +120,10 @@ public class ClickhousePart implements Serializable,
Comparable<ClickhousePart>
+ '\''
+ ", shard="
+ shard
- + ", offset="
- + offset
+ ", isEndOfPart="
+ isEndOfPart
+ + ", lastOrderingKeyValues="
+ + lastOrderingKeyValues
+ '}';
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
index 77da3a77dc..1a8f6c27ec 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReader.java
@@ -18,10 +18,12 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
@@ -33,28 +35,51 @@ import com.clickhouse.client.ClickHouseResponse;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+/**
+ * ClickhouseValueReader is responsible for reading data from ClickHouse
database. It supports two
+ * reading modes determined by {@link #shouldUseStreamReader()}:
+ *
+ * <p>1. Stream Mode: Used when the query is complex, no sorting key exists,
or not all sorting key
+ * columns are included in the query fields.
+ *
+ * <p>2. Batch Mode: Used keyset pagination approach by tracking the last
row's sorting key values
+ * from each batch. This mode requires {@link #isAllSortKeyInRowType()} to be
true, meaning all
+ * sorting key columns must be included in the query fields.
+ */
@Slf4j
public class ClickhouseValueReader implements Serializable {
private static final long serialVersionUID = 4588012013447713463L;
+ private static final DateTimeFormatter TS_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
private final ClickhouseSourceSplit clickhouseSourceSplit;
private final SeaTunnelRowType rowTypeInfo;
private final ClickhouseSourceTable clickhouseSourceTable;
private StreamValueReader streamValueReader;
private ClickhouseProxy proxy;
+ private final boolean shouldUseStreamReader;
protected int currentPartIndex = 0;
private List<SeaTunnelRow> rowBatch;
+ // SQL strategy keyset order values
+ private List<Object> sqlLastOrderingKeyValues;
+
public ClickhouseValueReader(
ClickhouseSourceSplit clickhouseSourceSplit,
SeaTunnelRowType seaTunnelRowType,
@@ -63,10 +88,11 @@ public class ClickhouseValueReader implements Serializable {
this.rowTypeInfo = seaTunnelRowType;
this.clickhouseSourceTable = clickhouseSourceTable;
this.proxy = new
ClickhouseProxy(clickhouseSourceSplit.getShard().getNode());
+ this.shouldUseStreamReader = shouldUseStreamReader();
}
public boolean hasNext() {
- if (shouldUseStreamReader()) {
+ if (shouldUseStreamReader) {
if (streamValueReader == null) {
streamValueReader = new StreamValueReader();
}
@@ -104,7 +130,7 @@ public class ClickhouseValueReader implements Serializable {
}
try {
- String query = buildPartQuery(currentPart);
+ String query = buildBatchPartQuery(currentPart);
rowBatch =
proxy.batchFetchRecords(
query, clickhouseSourceTable.getTablePath(),
rowTypeInfo);
@@ -121,8 +147,15 @@ public class ClickhouseValueReader implements Serializable
{
return currentPartIndex < partSize && partBatchStrategyRead();
}
- // update part offset
- currentPart.setOffset(currentPart.getOffset() + rowBatch.size());
+ // update Keyset cursor (last ordering key values)
+ String sortingKey =
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+ SeaTunnelRow lastRow = rowBatch.get(rowBatch.size() - 1);
+ List<Object> keyValues = extractOrderingKeyValuesFromRow(lastRow,
sortingKey);
+ log.debug("lastRow: {}, extract ordering key values from row: {}",
lastRow, keyValues);
+
+ currentPart.setLastOrderingKeyValues(keyValues);
+
return true;
} catch (Exception e) {
throw new ClickhouseConnectorException(
@@ -138,15 +171,26 @@ public class ClickhouseValueReader implements
Serializable {
}
private boolean sqlBatchStrategyRead() {
- String query = buildSqlQuery();
+ String query = buildBatchSqlQuery();
try {
rowBatch =
proxy.batchFetchRecords(
query, clickhouseSourceTable.getTablePath(),
rowTypeInfo);
- clickhouseSourceSplit.setSqlOffset(
- clickhouseSourceSplit.getSqlOffset() + rowBatch.size());
+ String sortingKey =
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+ if (rowBatch.isEmpty()) {
+ return false;
+ }
+ SeaTunnelRow lastRow = rowBatch.get(rowBatch.size() - 1);
+
+ sqlLastOrderingKeyValues =
extractOrderingKeyValuesFromRow(lastRow, sortingKey);
+
+ log.debug(
+ "lastRow: {}, extract ordering key values from row: {}",
+ lastRow,
+ sqlLastOrderingKeyValues);
return !rowBatch.isEmpty();
} catch (Exception e) {
@@ -173,10 +217,30 @@ public class ClickhouseValueReader implements
Serializable {
private boolean shouldUseStreamReader() {
return clickhouseSourceTable.isComplexSql()
- ||
StringUtils.isEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey());
+ ||
StringUtils.isEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey())
+ || !isAllSortKeyInRowType();
}
- private String buildPartQuery(ClickhousePart part) {
+ /** Verify if all sorting key exists in roTypeInfo */
+ private boolean isAllSortKeyInRowType() {
+ ClickhouseTable clickhouseTable =
clickhouseSourceTable.getClickhouseTable();
+ if (clickhouseTable == null ||
StringUtils.isEmpty(clickhouseTable.getSortingKey())) {
+ return false;
+ }
+ String sortingKey = clickhouseTable.getSortingKey();
+ List<String> sortingKeyList =
+
Arrays.stream(sortingKey.split(",")).map(String::trim).collect(Collectors.toList());
+
+ // check all sort key exists in rowTypeInfo
+ Optional<String> sortKeyNotExistOpt =
+ sortingKeyList.stream()
+ .filter(key -> rowTypeInfo.indexOf(key, false) == -1)
+ .findAny();
+
+ return !sortKeyNotExistOpt.isPresent();
+ }
+
+ private String buildBatchPartQuery(ClickhousePart part) {
TablePath tablePath = TablePath.of(part.getDatabase(),
part.getTable());
String whereClause = String.format("_part = '%s'", part.getName());
@@ -184,55 +248,170 @@ public class ClickhouseValueReader implements
Serializable {
whereClause += " AND (" + clickhouseSourceTable.getFilterQuery() +
")";
}
- String orderByClause = "";
- if
(StringUtils.isNotEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey()))
{
- orderByClause =
- " ORDER BY " +
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+ String sortingKey =
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+ String orderByClause = " ORDER BY " + sortingKey;
+
+ String keysetWhere = "";
+ // Key cursor mode pagination: when sorting key exists, use tuple
comparison on
+ // lastOrderingKeyValues
+ if (part.getLastOrderingKeyValues() != null) {
+ keysetWhere = buildKeysetWhereCondition(sortingKey,
part.getLastOrderingKeyValues());
+ if (!keysetWhere.isEmpty()) {
+ whereClause += " AND (" + keysetWhere + ")";
+ }
}
String sql;
- if (StringUtils.isNotEmpty(orderByClause)) {
+
+ if (part.getLastOrderingKeyValues() != null) {
+ // key cursor mode: no OFFSET, only LIMIT
sql =
String.format(
- "SELECT * FROM %s.%s WHERE %s %s LIMIT %d, %d WITH
TIES",
+ "SELECT * FROM %s.%s WHERE %s %s LIMIT %d WITH
TIES",
tablePath.getDatabaseName(),
tablePath.getTableName(),
whereClause,
orderByClause,
- part.getOffset(),
clickhouseSourceTable.getBatchSize());
} else {
+ // for the first sql creation, lastOrderingKeyValues is null
sql =
String.format(
- "SELECT * FROM %s.%s WHERE %s",
- tablePath.getDatabaseName(),
tablePath.getTableName(), whereClause);
+ "SELECT * FROM %s.%s WHERE %s %s LIMIT %d, %d WITH
TIES",
+ tablePath.getDatabaseName(),
+ tablePath.getTableName(),
+ whereClause,
+ orderByClause,
+ 0,
+ clickhouseSourceTable.getBatchSize());
}
+ log.info("generate batch part sql: {}", sql);
+
return sql;
}
- private String buildSqlQuery() {
- String orderByClause = "";
- if
(StringUtils.isNotEmpty(clickhouseSourceTable.getClickhouseTable().getSortingKey()))
{
- orderByClause =
- " ORDER BY " +
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+ private String buildBatchSqlQuery() {
+ String base =
+ String.format("SELECT * FROM (%s) AS t",
clickhouseSourceSplit.getSplitQuery());
+
+ String sortingKey =
clickhouseSourceTable.getClickhouseTable().getSortingKey();
+
+ String whereClause = "";
+ if (sqlLastOrderingKeyValues != null) {
+ String keyset = buildKeysetWhereCondition(sortingKey,
sqlLastOrderingKeyValues);
+ if (!keyset.isEmpty()) {
+ whereClause = " WHERE (" + keyset + ")";
+ }
}
- String executeSql;
- if (StringUtils.isNotEmpty(orderByClause)) {
- executeSql =
+ String orderByClause = " ORDER BY " + sortingKey;
+
+ String sql;
+ if (sqlLastOrderingKeyValues != null) {
+ // key cursor mode: no OFFSET, only LIMIT
+ sql =
String.format(
- "SELECT * FROM (%s) AS t %s LIMIT %d, %d WITH
TIES",
- clickhouseSourceSplit.getSplitQuery(),
- orderByClause,
- clickhouseSourceSplit.getSqlOffset(),
- clickhouseSourceTable.getBatchSize());
+ "%s %s %s LIMIT %d WITH TIES",
+ base, whereClause, orderByClause,
clickhouseSourceTable.getBatchSize());
} else {
- executeSql =
- String.format("SELECT * FROM (%s) AS t",
clickhouseSourceSplit.getSplitQuery());
+ // for the first sql creation, sqlLastOrderingKeyValues is null
+ sql =
+ String.format(
+ "%s %s LIMIT %d, %d WITH TIES",
+ base, orderByClause, 0,
clickhouseSourceTable.getBatchSize());
+ }
+
+ log.info("generate batch query sql: {}", sql);
+
+ return sql;
+ }
+
+ /**
+ * Build WHERE condition using the sorting key and last key values.
Supports single or composite
+ * keys, and generates lexicographic tuple comparison.
+ */
+ private String buildKeysetWhereCondition(String sortingKey, List<Object>
lastKeyValues) {
+ List<String> keyCols =
+
Arrays.stream(sortingKey.split(",")).map(String::trim).collect(Collectors.toList());
+ if (lastKeyValues == null
+ || lastKeyValues.isEmpty()
+ || keyCols.size() != lastKeyValues.size()) {
+ return "";
}
- return executeSql;
+ // Build tuple comparison (c1, c2, ...) > (v1, v2, ...)
+ String left = "(" + String.join(", ", keyCols) + ")";
+
+ // Convert lastKeyValues to SQL literals based on rowTypeInfo
+ String inlinedRight = "(" + buildSqlLiteralsForKeyValues(keyCols,
lastKeyValues) + ")";
+
+ return left + " > " + inlinedRight;
+ }
+
+ private String buildSqlLiteralsForKeyValues(List<String> keyCols,
List<Object> values) {
+ List<String> literals = new ArrayList<>();
+ for (int i = 0; i < keyCols.size(); i++) {
+ String col = keyCols.get(i);
+ Object v = values.get(i);
+ literals.add(toSqlLiteral(col, v));
+ }
+ return String.join(", ", literals);
+ }
+
+ private String toSqlLiteral(String column, Object value) {
+ if (value == null) {
+ return "NULL";
+ }
+ int idx = rowTypeInfo.indexOf(column, false);
+ if (idx < 0) {
+ // fallback: quote as string
+ return quoteString(value.toString());
+ }
+ SeaTunnelDataType<?> t = rowTypeInfo.getFieldType(idx);
+ switch (t.getSqlType()) {
+ case STRING:
+ return quoteString(value.toString());
+ case BOOLEAN:
+ return Boolean.TRUE.equals(value) ? "1" : "0";
+ case TINYINT:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ return value.toString();
+ case DATE:
+ if (value instanceof LocalDate) {
+ return quoteString(value.toString());
+ }
+ return quoteString(String.valueOf(value));
+ case TIMESTAMP:
+ if (value instanceof LocalDateTime) {
+ return quoteString(TS_FORMATTER.format((LocalDateTime)
value));
+ }
+ return quoteString(String.valueOf(value));
+ default:
+ return quoteString(String.valueOf(value));
+ }
+ }
+
+ private List<Object> extractOrderingKeyValuesFromRow(SeaTunnelRow row,
String sortingKey) {
+ List<String> keyCols =
+
Arrays.stream(sortingKey.split(",")).map(String::trim).collect(Collectors.toList());
+ List<Object> keyValues = new ArrayList<>(keyCols.size());
+ for (String col : keyCols) {
+ int idx = rowTypeInfo.indexOf(col, false);
+ keyValues.add(row.getField(idx));
+ }
+ return keyValues;
+ }
+
+ private String quoteString(String s) {
+ String escaped = s.replace("\\", "\\\\").replace("'", "''");
+ return "'" + escaped + "'";
}
private class StreamValueReader implements Serializable {
@@ -259,6 +438,7 @@ public class ClickhouseValueReader implements Serializable {
try {
for (String sql : sqlList) {
executeSql = sql;
+ log.info("execute stream sql: {}",
executeSql);
try (ClickHouseResponse response =
proxy.getClickhouseConnection()
.query(sql)
@@ -335,11 +515,24 @@ public class ClickhouseValueReader implements
Serializable {
return
Collections.singletonList(clickhouseSourceSplit.getSplitQuery());
} else {
return clickhouseSourceSplit.getParts().stream()
- .map(ClickhouseValueReader.this::buildPartQuery)
+ .map(this::buildStreamPartQuery)
.collect(Collectors.toList());
}
}
+ private String buildStreamPartQuery(ClickhousePart part) {
+ TablePath tablePath = TablePath.of(part.getDatabase(),
part.getTable());
+
+ String whereClause = String.format("_part = '%s'", part.getName());
+ if
(StringUtils.isNotEmpty(clickhouseSourceTable.getFilterQuery())) {
+ whereClause += " AND (" +
clickhouseSourceTable.getFilterQuery() + ")";
+ }
+
+ return String.format(
+ "SELECT * FROM %s.%s WHERE %s",
+ tablePath.getDatabaseName(), tablePath.getTableName(),
whereClause);
+ }
+
public void close() {
if (rowQueue != null) {
rowQueue.clear();
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
index 445c5bb4c7..62ad3f15ce 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplit.java
@@ -24,7 +24,6 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhousePar
import lombok.AllArgsConstructor;
import lombok.Data;
-import lombok.Setter;
import java.util.List;
@@ -39,7 +38,6 @@ public class ClickhouseSourceSplit implements SourceSplit {
private final List<ClickhousePart> parts;
private final Shard shard;
private final String splitQuery;
- @Setter private int sqlOffset;
private final String splitId;
@@ -66,8 +64,6 @@ public class ClickhouseSourceSplit implements SourceSplit {
+ ", splitQuery='"
+ splitQuery
+ "'"
- + ", sqlOffset="
- + sqlOffset
+ ", splitId='"
+ splitId
+ "'"
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
index 5c5a4bd0ed..ca36fe6ae7 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/PartStrategySplitter.java
@@ -118,7 +118,6 @@ public class PartStrategySplitter implements Splitter,
AutoCloseable, Serializab
new ArrayList<>(partSplit),
shardPartsEntry.getKey(),
clickhouseSourceTable.getOriginQuery(),
- 0,
splitId);
splits.add(clickhouseSourceSplit);
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
index b9242fcd42..07b9fa8c77 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/SqlStrategySplitter.java
@@ -64,7 +64,6 @@ public class SqlStrategySplitter implements Splitter,
AutoCloseable, Serializabl
new ArrayList<>(),
shard,
querySql,
- 0,
createSplitId(
clickhouseSourceTable.getTablePath(),
shard,
@@ -101,7 +100,6 @@ public class SqlStrategySplitter implements Splitter,
AutoCloseable, Serializabl
new ArrayList<>(),
clusterShardList.get(0),
clickhouseSourceTable.getOriginQuery(),
- 0,
createSplitId(
clickhouseSourceTable.getTablePath(),
clusterShardList.get(0), 0)));
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
index 57571b1dc7..22d433e459 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseValueReaderTest.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
@@ -46,9 +47,12 @@ import com.clickhouse.client.data.ClickHouseStringValue;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -100,7 +104,6 @@ public class ClickhouseValueReaderTest {
new ArrayList<>(parts),
shard,
"",
- 0,
"split-1");
mockProxy = Mockito.mock(ClickhouseProxy.class,
Mockito.RETURNS_DEEP_STUBS);
@@ -108,13 +111,8 @@ public class ClickhouseValueReaderTest {
initStreamValueReaderMock();
reader = new ClickhouseValueReader(split, rowType, sourceTable);
- try {
- Field proxyField =
ClickhouseValueReader.class.getDeclaredField("proxy");
- proxyField.setAccessible(true);
- proxyField.set(reader, mockProxy);
- } catch (Exception e) {
- throw new RuntimeException("Failed to set mock proxy", e);
- }
+
+ ReflectionUtils.setField(reader, ClickhouseValueReader.class, "proxy",
mockProxy);
}
@Test
@@ -130,9 +128,11 @@ public class ClickhouseValueReaderTest {
Assertions.assertEquals(BATCH_SIZE, result.size());
Assertions.assertEquals(0, reader.currentPartIndex);
- // Make sure the offset has been updated but the part has not been
marked as end of part
+ // In keyset mode, lastOrderingKeyValues should be updated, offset
remains 0
List<ClickhousePart> parts = new ArrayList<>(split.getParts());
- Assertions.assertEquals(BATCH_SIZE, parts.get(0).getOffset());
+ Assertions.assertNotNull(parts.get(0).getLastOrderingKeyValues());
+ Assertions.assertEquals(
+ (long) (BATCH_SIZE - 1),
parts.get(0).getLastOrderingKeyValues().get(0));
Assertions.assertFalse(parts.get(0).isEndOfPart());
}
@@ -150,9 +150,11 @@ public class ClickhouseValueReaderTest {
List<SeaTunnelRow> result = reader.next();
Assertions.assertEquals(partialSize, result.size());
- // Make sure the offset has been updated
+ // In keyset mode, lastOrderingKeyValues should be updated to last row
id, and no EOS
List<ClickhousePart> parts = new ArrayList<>(split.getParts());
- Assertions.assertEquals(partialSize, parts.get(0).getOffset());
+ Assertions.assertNotNull(parts.get(0).getLastOrderingKeyValues());
+ Assertions.assertEquals(
+ (long) (partialSize - 1),
parts.get(0).getLastOrderingKeyValues().get(0));
Assertions.assertTrue(reader.hasNext());
}
@@ -193,9 +195,13 @@ public class ClickhouseValueReaderTest {
invocation -> {
ClickhousePart part =
parts.get(reader.currentPartIndex);
if ("part1".equals(part.getName())) {
- return part.getOffset() == 0 ? mockRows1 : new
ArrayList<>();
+ return part.getLastOrderingKeyValues() == null
+ ? mockRows1
+ : new ArrayList<>();
} else {
- return part.getOffset() == 0 ? mockRows2 : new
ArrayList<>();
+ return part.getLastOrderingKeyValues() == null
+ ? mockRows2
+ : new ArrayList<>();
}
});
@@ -220,7 +226,8 @@ public class ClickhouseValueReaderTest {
@Test
public void testPartStrategyReadWithNoSortingKey() {
- when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("");
+ ReflectionUtils.setField(
+ reader, ClickhouseValueReader.class, "shouldUseStreamReader",
true);
Assertions.assertTrue(reader.hasNext());
List<SeaTunnelRow> result = reader.next();
@@ -235,16 +242,10 @@ public class ClickhouseValueReaderTest {
@Test
public void testSqlStrategyReadWithNoSortingKey() {
- try {
- Field sqlStrategyField =
-
ClickhouseSourceTable.class.getDeclaredField("isSqlStrategyRead");
- sqlStrategyField.setAccessible(true);
- sqlStrategyField.set(sourceTable, true);
- } catch (Exception e) {
- Assertions.fail("Failed to set isSqlStrategyRead field", e);
- }
-
- when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("");
+ ReflectionUtils.setField(
+ sourceTable, ClickhouseSourceTable.class, "isSqlStrategyRead",
true);
+ ReflectionUtils.setField(
+ reader, ClickhouseValueReader.class, "shouldUseStreamReader",
true);
Assertions.assertTrue(reader.hasNext());
@@ -256,29 +257,23 @@ public class ClickhouseValueReaderTest {
@Test
public void testSqlStrategyReadWithSortingKey() {
- try {
- Field sqlStrategyField =
-
ClickhouseSourceTable.class.getDeclaredField("isSqlStrategyRead");
- sqlStrategyField.setAccessible(true);
- sqlStrategyField.set(sourceTable, true);
- } catch (Exception e) {
- Assertions.fail("Failed to set isSqlStrategyRead field", e);
- }
+ ReflectionUtils.setField(
+ sourceTable, ClickhouseSourceTable.class, "isSqlStrategyRead",
true);
when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("id");
+ // In Keyset mode, we expect multiple batches without relying on
sqlOffset
List<SeaTunnelRow> firstBatch = createMockRows(BATCH_SIZE);
List<SeaTunnelRow> secondBatch = createMockRows(5);
List<SeaTunnelRow> emptyBatch = new ArrayList<>();
- when(mockProxy.batchFetchRecords(any(),
eq(sourceTable.getTablePath()), eq(rowType)))
- .thenAnswer(
- x ->
- split.getSqlOffset() == 0
- ? firstBatch
- : split.getSqlOffset() == BATCH_SIZE
- ? secondBatch
- : emptyBatch);
+ // Simulate: first call returns firstBatch, second call returns
secondBatch, then empty
+ Mockito.when(
+ mockProxy.batchFetchRecords(
+ any(), eq(sourceTable.getTablePath()),
eq(rowType)))
+ .thenReturn(firstBatch)
+ .thenReturn(secondBatch)
+ .thenReturn(emptyBatch);
Assertions.assertTrue(reader.hasNext());
List<SeaTunnelRow> result1 = reader.next();
@@ -319,6 +314,76 @@ public class ClickhouseValueReaderTest {
}
}
+ @Test
+ public void testBuildKeysetWhereCondition() throws Exception {
+ Optional<Method> methodOpt =
+ ReflectionUtils.getDeclaredMethod(
+ ClickhouseValueReader.class,
+ "buildKeysetWhereCondition",
+ String.class,
+ List.class);
+ Assertions.assertTrue(methodOpt.isPresent());
+
+ Method buildKeysetWhereConditionMethod = methodOpt.get();
+
+ // Test a single sort key
+ String sortingKey = "id";
+ List<Object> keyValues = Collections.singletonList(100L);
+ Object result = buildKeysetWhereConditionMethod.invoke(reader,
sortingKey, keyValues);
+ Assertions.assertEquals("(id) > (100)", result);
+
+ // Test the composite sort key
+ sortingKey = "id, name";
+ keyValues = Arrays.asList(100L, "test");
+ result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey,
keyValues);
+ Assertions.assertEquals("(id, name) > (100, 'test')", result);
+
+ // Test values containing special characters
+ sortingKey = "id, name";
+ keyValues = Arrays.asList(100L, "test'with quote");
+ result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey,
keyValues);
+ Assertions.assertEquals("(id, name) > (100, 'test''with quote')",
result);
+
+ // Test the list of null key values
+ result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey,
null);
+ Assertions.assertEquals("", result);
+
+ result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey,
new ArrayList<>());
+ Assertions.assertEquals("", result);
+
+ // The number of test keys and values does not match
+ sortingKey = "id, name, age";
+ keyValues = Arrays.asList(100L, "test");
+ result = buildKeysetWhereConditionMethod.invoke(reader, sortingKey,
keyValues);
+ Assertions.assertEquals("", result);
+ }
+
+ @Test
+ public void testIsAllSortKeyInRowType() throws Exception {
+ Optional<Method> methodOpt =
+ ReflectionUtils.getDeclaredMethod(
+ ClickhouseValueReader.class, "isAllSortKeyInRowType");
+ Assertions.assertTrue(methodOpt.isPresent());
+
+ Method isAllSortKeyInRowTypeMethod = methodOpt.get();
+
+ // Test case 1: Valid composite sorting key
+ when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("id,
age");
+ boolean result = (boolean) isAllSortKeyInRowTypeMethod.invoke(reader);
+ Assertions.assertTrue(result);
+
+ // Test case 2: Empty sorting key
+ when(sourceTable.getClickhouseTable().getSortingKey()).thenReturn("");
+ result = (boolean) isAllSortKeyInRowTypeMethod.invoke(reader);
+ Assertions.assertFalse(result);
+
+ // Test case 3: row type not contains all sort key
+ when(sourceTable.getClickhouseTable().getSortingKey())
+ .thenReturn("id, name, age, non_existent_field");
+ result = (boolean) isAllSortKeyInRowTypeMethod.invoke(reader);
+ Assertions.assertFalse(result);
+ }
+
private void initStreamValueReaderMock() throws ClickHouseException {
mockClickhouseQueryAndResponse(mockProxy, null,
createMockClickHouseRecords());
}