This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fe801131c49 Refactor PipelineSQLSegmentBuilder (#33958)
fe801131c49 is described below
commit fe801131c49ef2edd20aff25bc5658932dbc931d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 21:47:20 2024 +0800
Refactor PipelineSQLSegmentBuilder (#33958)
---
.../sqlbuilder/segment/PipelineSQLSegmentBuilder.java | 16 ++++++++++++++++
.../sql/PipelineDataConsistencyCalculateSQLBuilder.java | 10 +++++-----
.../segment/PipelineSQLSegmentBuilderTest.java | 4 ++++
.../general/PostgreSQLMigrationGeneralE2EIT.java | 8 ++++----
4 files changed, 29 insertions(+), 9 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
index 8668fc48b13..a951c1ada4b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
/**
* Pipeline SQL segment builder.
@@ -58,4 +59,19 @@ public final class PipelineSQLSegmentBuilder {
result.append(getEscapedIdentifier(tableName));
return result.toString();
}
+
+ /**
+ * Get qualified table name.
+ *
+ * @param qualifiedTable qualified table
+ * @return qualified table name
+ */
+ public String getQualifiedTableName(final QualifiedTable qualifiedTable) {
+ StringBuilder result = new StringBuilder();
+ if (dialectDatabaseMetaData.isSchemaAvailable() &&
!Strings.isNullOrEmpty(qualifiedTable.getSchemaName())) {
+
result.append(getEscapedIdentifier(qualifiedTable.getSchemaName())).append('.');
+ }
+ result.append(getEscapedIdentifier(qualifiedTable.getTableName()));
+ return result.toString();
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index c5211dcd225..6fb62ac968d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -62,7 +62,7 @@ public final class PipelineDataConsistencyCalculateSQLBuilder
{
private String buildQueryRangeOrderingSQL0(final QualifiedTable
qualifiedTable, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
@Nullable final List<String>
shardingColumnsNames) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName());
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
String firstUniqueKey = uniqueKeys.get(0);
String orderByColumns = joinColumns(uniqueKeys,
shardingColumnsNames).stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each) + "
ASC").collect(Collectors.joining(", "));
@@ -93,15 +93,15 @@ public final class
PipelineDataConsistencyCalculateSQLBuilder {
/**
* Build point query SQL.
*
- * @param table qualified table
+ * @param qualifiedTable qualified table
* @param columnNames column names
* @param uniqueKeys unique keys, it may be primary key, not null
* @param shardingColumnsNames sharding columns names, nullable
* @return built SQL
*/
- public String buildPointQuerySQL(final QualifiedTable table, final
Collection<String> columnNames, final List<String> uniqueKeys,
+ public String buildPointQuerySQL(final QualifiedTable qualifiedTable,
final Collection<String> columnNames, final List<String> uniqueKeys,
@Nullable final List<String>
shardingColumnsNames) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(table.getSchemaName(),
table.getTableName());
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
String equalsConditions = joinColumns(uniqueKeys,
shardingColumnsNames).stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each) +
"=?").collect(Collectors.joining(" AND "));
return String.format("SELECT %s FROM %s WHERE %s", queryColumns,
qualifiedTableName, equalsConditions);
@@ -126,6 +126,6 @@ public final class
PipelineDataConsistencyCalculateSQLBuilder {
*/
public Optional<String> buildCRC32SQL(final QualifiedTable qualifiedTable,
final String columnName) {
return dialectSQLBuilder.buildCRC32SQL(
-
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName()),
sqlSegmentBuilder.getEscapedIdentifier(columnName));
+ sqlSegmentBuilder.getQualifiedTableName(qualifiedTable),
sqlSegmentBuilder.getEscapedIdentifier(columnName));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
index 417ecff7e73..4c6b1d74fd7 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
@@ -43,15 +44,18 @@ class PipelineSQLSegmentBuilderTest {
@Test
void assertGetQualifiedTableNameWithUnsupportedSchema() {
assertThat(mySQLBuilder.getQualifiedTableName("foo_schema",
"foo_tbl"), is("foo_tbl"));
+ assertThat(mySQLBuilder.getQualifiedTableName(new
QualifiedTable("foo_schema", "foo_tbl")), is("foo_tbl"));
}
@Test
void assertGetQualifiedTableNameWithSupportedSchema() {
assertThat(postgreSQLBuilder.getQualifiedTableName("foo_schema",
"foo_tbl"), is("foo_schema.foo_tbl"));
+ assertThat(postgreSQLBuilder.getQualifiedTableName(new
QualifiedTable("foo_schema", "foo_tbl")), is("foo_schema.foo_tbl"));
}
@Test
void assertGetQualifiedTableNameWithSupportedSchemaAndNullSchema() {
assertThat(postgreSQLBuilder.getQualifiedTableName(null, "foo_tbl"),
is("foo_tbl"));
+ assertThat(postgreSQLBuilder.getQualifiedTableName(new
QualifiedTable(null, "foo_tbl")), is("foo_tbl"));
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 1dd4a33fc4c..f10f3e98f6b 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -87,13 +87,13 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
String jobId = getJobIdByTableName(containerComposer, "ds_0.test."
+ SOURCE_TABLE_NAME);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- String schemaTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
- containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName,
new SnowflakeKeyGenerateAlgorithm(),
+ String qualifiedTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
+ containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName,
new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
- containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
- containerComposer.assertOrderRecordExist(jdbcDataSource,
schemaTableName, 10000);
+ containerComposer.assertOrderRecordExist(jdbcDataSource,
qualifiedTableName, 10000);
checkOrderMigration(containerComposer, jobId);
startMigrationWithSchema(containerComposer, "t_order_item",
"t_order_item");
checkOrderItemMigration(containerComposer);