This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 34901da4449 Extract PipelineInventoryCalculateSQLBuilder from
PipelineDataConsistencyCalculateSQLBuilder (#36840)
34901da4449 is described below
commit 34901da44496ee87e2267238d34fa816b85f72f4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Oct 10 17:31:23 2025 +0800
Extract PipelineInventoryCalculateSQLBuilder from
PipelineDataConsistencyCalculateSQLBuilder (#36840)
---
.../AbstractRecordTableInventoryCalculator.java | 4 +-
...PipelineDataConsistencyCalculateSQLBuilder.java | 78 ----------------------
...a => PipelineInventoryCalculateSQLBuilder.java} | 19 +-----
...lineDataConsistencyCalculateSQLBuilderTest.java | 59 ----------------
... PipelineInventoryCalculateSQLBuilderTest.java} | 11 +--
5 files changed, 7 insertions(+), 164 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
index 71e9e184373..f27622728e6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.quer
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryCalculateSQLBuilder;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
@@ -262,7 +262,7 @@ public abstract class
AbstractRecordTableInventoryCalculator<S, C> extends Abstr
private String getQuerySQL(final TableInventoryCalculateParameter param) {
ShardingSpherePreconditions.checkState(null != param.getUniqueKeys()
&& !param.getUniqueKeys().isEmpty() && null != param.getFirstUniqueKey(),
() -> new UnsupportedOperationException("Record inventory
calculator does not support table without unique key and primary key now."));
- PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
+ PipelineInventoryCalculateSQLBuilder pipelineSQLBuilder = new
PipelineInventoryCalculateSQLBuilder(param.getDatabaseType());
Collection<String> columnNames = param.getColumnNames().isEmpty() ?
Collections.singleton("*") : param.getColumnNames();
switch (param.getQueryType()) {
case RANGE_QUERY:
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index 2a9a3815e05..18f0d4e9e85 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -17,18 +17,13 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Pipeline data consistency calculate SQL builder.
@@ -44,79 +39,6 @@ public final class
PipelineDataConsistencyCalculateSQLBuilder {
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
}
- /**
- * Build query range ordering SQL.
- *
- * @param qualifiedTable qualified table
- * @param columnNames column names
- * @param uniqueKeys unique keys, it may be primary key, not null
- * @param queryRange query range
- * @param pageQuery whether it is page query
- * @param shardingColumnsNames sharding columns names
- * @return built SQL
- */
- public String buildQueryRangeOrderingSQL(final QualifiedTable
qualifiedTable, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
- final boolean pageQuery, final
List<String> shardingColumnsNames) {
- String result = buildQueryRangeOrderingSQL0(qualifiedTable,
columnNames, uniqueKeys, queryRange, shardingColumnsNames);
- return pageQuery ? dialectSQLBuilder.wrapWithPageQuery(result) :
result;
- }
-
- private String buildQueryRangeOrderingSQL0(final QualifiedTable
qualifiedTable, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
- final List<String>
shardingColumnsNames) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
- String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
- String firstUniqueKey = uniqueKeys.get(0);
- String orderByColumns = joinColumns(uniqueKeys,
shardingColumnsNames).stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each) + "
ASC").collect(Collectors.joining(", "));
- if (null != queryRange.getLower() && null != queryRange.getUpper()) {
- return String.format("SELECT %s FROM %s WHERE %s AND %s ORDER BY
%s", queryColumns, qualifiedTableName,
-
buildLowerQueryRangeCondition(queryRange.isLowerInclusive(), firstUniqueKey),
- buildUpperQueryRangeCondition(firstUniqueKey),
orderByColumns);
- } else if (null != queryRange.getLower()) {
- return String.format("SELECT %s FROM %s WHERE %s ORDER BY %s",
queryColumns, qualifiedTableName,
-
buildLowerQueryRangeCondition(queryRange.isLowerInclusive(), firstUniqueKey),
orderByColumns);
- } else if (null != queryRange.getUpper()) {
- return String.format("SELECT %s FROM %s WHERE %s ORDER BY %s",
queryColumns, qualifiedTableName,
- buildUpperQueryRangeCondition(firstUniqueKey),
orderByColumns);
- } else {
- return String.format("SELECT %s FROM %s ORDER BY %s",
queryColumns, qualifiedTableName, orderByColumns);
- }
- }
-
- private String buildLowerQueryRangeCondition(final boolean inclusive,
final String firstUniqueKey) {
- String delimiter = inclusive ? ">=?" : ">?";
- return sqlSegmentBuilder.getEscapedIdentifier(firstUniqueKey) +
delimiter;
- }
-
- private String buildUpperQueryRangeCondition(final String firstUniqueKey) {
- return sqlSegmentBuilder.getEscapedIdentifier(firstUniqueKey) + "<=?";
- }
-
- /**
- * Build point query SQL.
- *
- * @param qualifiedTable qualified table
- * @param columnNames column names
- * @param uniqueKeys unique keys, it may be primary key, not null
- * @param shardingColumnsNames sharding columns names, nullable
- * @return built SQL
- */
- public String buildPointQuerySQL(final QualifiedTable qualifiedTable,
final Collection<String> columnNames, final List<String> uniqueKeys, final
List<String> shardingColumnsNames) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
- String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
- String equalsConditions = joinColumns(uniqueKeys,
shardingColumnsNames).stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each) +
"=?").collect(Collectors.joining(" AND "));
- return String.format("SELECT %s FROM %s WHERE %s", queryColumns,
qualifiedTableName, equalsConditions);
- }
-
- private List<String> joinColumns(final List<String> uniqueKeys, final
List<String> shardingColumnsNames) {
- if (shardingColumnsNames.isEmpty()) {
- return uniqueKeys;
- }
- List<String> result = new ArrayList<>(uniqueKeys.size() +
shardingColumnsNames.size());
- result.addAll(uniqueKeys);
- result.addAll(shardingColumnsNames);
- return result;
- }
-
/**
* Build CRC32 SQL.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryCalculateSQLBuilder.java
similarity index 90%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryCalculateSQLBuilder.java
index 2a9a3815e05..711647fb5ab 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryCalculateSQLBuilder.java
@@ -27,19 +27,18 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
/**
- * Pipeline data consistency calculate SQL builder.
+ * Pipeline inventory calculate SQL builder.
*/
-public final class PipelineDataConsistencyCalculateSQLBuilder {
+public final class PipelineInventoryCalculateSQLBuilder {
private final DialectPipelineSQLBuilder dialectSQLBuilder;
private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
- public PipelineDataConsistencyCalculateSQLBuilder(final DatabaseType
databaseType) {
+ public PipelineInventoryCalculateSQLBuilder(final DatabaseType
databaseType) {
dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
}
@@ -116,16 +115,4 @@ public final class
PipelineDataConsistencyCalculateSQLBuilder {
result.addAll(shardingColumnsNames);
return result;
}
-
- /**
- * Build CRC32 SQL.
- *
- * @param qualifiedTable qualified table
- * @param columnName column name
- * @return built SQL
- */
- public Optional<String> buildCRC32SQL(final QualifiedTable qualifiedTable,
final String columnName) {
- return dialectSQLBuilder.buildCRC32SQL(
- sqlSegmentBuilder.getQualifiedTableName(qualifiedTable),
sqlSegmentBuilder.getEscapedIdentifier(columnName));
- }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
index 5715c3cacab..1737e147209 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
@@ -17,16 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
@@ -34,62 +29,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
class PipelineDataConsistencyCalculateSQLBuilderTest {
- private static final Collection<String> COLUMN_NAMES =
Arrays.asList("order_id", "user_id", "status");
-
- private static final List<String> UNIQUE_KEYS = Arrays.asList("order_id",
"status");
-
- private static final List<String> SHARDING_COLUMNS_NAMES =
Collections.singletonList("user_id");
-
private final PipelineDataConsistencyCalculateSQLBuilder sqlBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
- @Test
- void assertBuildQueryRangeOrderingSQLPageQuery() {
- String actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(1, true, 5), true, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id
ASC LIMIT ?"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(1, false, 5), true, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC
LIMIT ?"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(1, false, null), true, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(null, false, 5), true, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(null, false, null), true,
SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC, status ASC, user_id ASC LIMIT ?"));
- }
-
- @Test
- void assertBuildQueryRangeOrderingSQLNotPageQuery() {
- String actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(1, true, 5), false, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id
ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(1, false, 5), false, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id
ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(1, false, null), false, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? ORDER BY order_id ASC, status ASC, user_id ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(null, false, 5), false, SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
- new QueryRange(null, false, null), false,
SHARDING_COLUMNS_NAMES);
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC, status ASC, user_id ASC"));
- }
-
- @Test
- void assertBuildPointQuerySQLWithoutQueryCondition() {
- String actual = sqlBuilder.buildPointQuerySQL(new QualifiedTable(null,
"t_order"), COLUMN_NAMES, UNIQUE_KEYS, Collections.emptyList());
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id=? AND status=?"));
- actual = sqlBuilder.buildPointQuerySQL(new QualifiedTable(null,
"t_order"), COLUMN_NAMES, UNIQUE_KEYS, Collections.emptyList());
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id=? AND status=?"));
- actual = sqlBuilder.buildPointQuerySQL(new QualifiedTable(null,
"t_order"), COLUMN_NAMES, UNIQUE_KEYS, Collections.singletonList("user_id"));
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id=? AND status=? AND user_id=?"));
- }
-
@Test
void assertBuildCRC32SQL() {
Optional<String> actual = sqlBuilder.buildCRC32SQL(new
QualifiedTable("foo_schema", "foo_tbl"), "foo_col");
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryCalculateSQLBuilderTest.java
similarity index 92%
copy from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
copy to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryCalculateSQLBuilderTest.java
index 5715c3cacab..026ded60638 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryCalculateSQLBuilderTest.java
@@ -27,12 +27,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class PipelineDataConsistencyCalculateSQLBuilderTest {
+class PipelineInventoryCalculateSQLBuilderTest {
private static final Collection<String> COLUMN_NAMES =
Arrays.asList("order_id", "user_id", "status");
@@ -40,7 +39,7 @@ class PipelineDataConsistencyCalculateSQLBuilderTest {
private static final List<String> SHARDING_COLUMNS_NAMES =
Collections.singletonList("user_id");
- private final PipelineDataConsistencyCalculateSQLBuilder sqlBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
+ private final PipelineInventoryCalculateSQLBuilder sqlBuilder = new
PipelineInventoryCalculateSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
@Test
void assertBuildQueryRangeOrderingSQLPageQuery() {
@@ -89,10 +88,4 @@ class PipelineDataConsistencyCalculateSQLBuilderTest {
actual = sqlBuilder.buildPointQuerySQL(new QualifiedTable(null,
"t_order"), COLUMN_NAMES, UNIQUE_KEYS, Collections.singletonList("user_id"));
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id=? AND status=? AND user_id=?"));
}
-
- @Test
- void assertBuildCRC32SQL() {
- Optional<String> actual = sqlBuilder.buildCRC32SQL(new
QualifiedTable("foo_schema", "foo_tbl"), "foo_col");
- assertThat(actual, is(Optional.of("SELECT CRC32(foo_col) FROM
foo_tbl")));
- }
}