This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fe801131c49 Refactor PipelineSQLSegmentBuilder (#33958)
fe801131c49 is described below

commit fe801131c49ef2edd20aff25bc5658932dbc931d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 21:47:20 2024 +0800

    Refactor PipelineSQLSegmentBuilder (#33958)
---
 .../sqlbuilder/segment/PipelineSQLSegmentBuilder.java    | 16 ++++++++++++++++
 .../sql/PipelineDataConsistencyCalculateSQLBuilder.java  | 10 +++++-----
 .../segment/PipelineSQLSegmentBuilderTest.java           |  4 ++++
 .../general/PostgreSQLMigrationGeneralE2EIT.java         |  8 ++++----
 4 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
index 8668fc48b13..a951c1ada4b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilder.java
@@ -21,6 +21,7 @@ import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 /**
  * Pipeline SQL segment builder.
@@ -58,4 +59,19 @@ public final class PipelineSQLSegmentBuilder {
         result.append(getEscapedIdentifier(tableName));
         return result.toString();
     }
+    
+    /**
+     * Get qualified table name.
+     *
+     * @param qualifiedTable qualified table
+     * @return qualified table name
+     */
+    public String getQualifiedTableName(final QualifiedTable qualifiedTable) {
+        StringBuilder result = new StringBuilder();
+        if (dialectDatabaseMetaData.isSchemaAvailable() && 
!Strings.isNullOrEmpty(qualifiedTable.getSchemaName())) {
+            
result.append(getEscapedIdentifier(qualifiedTable.getSchemaName())).append('.');
+        }
+        result.append(getEscapedIdentifier(qualifiedTable.getTableName()));
+        return result.toString();
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index c5211dcd225..6fb62ac968d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -62,7 +62,7 @@ public final class PipelineDataConsistencyCalculateSQLBuilder 
{
     
     private String buildQueryRangeOrderingSQL0(final QualifiedTable 
qualifiedTable, final Collection<String> columnNames, final List<String> 
uniqueKeys, final QueryRange queryRange,
                                                @Nullable final List<String> 
shardingColumnsNames) {
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName());
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
         String queryColumns = 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
         String firstUniqueKey = uniqueKeys.get(0);
         String orderByColumns = joinColumns(uniqueKeys, 
shardingColumnsNames).stream().map(each -> 
sqlSegmentBuilder.getEscapedIdentifier(each) + " 
ASC").collect(Collectors.joining(", "));
@@ -93,15 +93,15 @@ public final class 
PipelineDataConsistencyCalculateSQLBuilder {
     /**
      * Build point query SQL.
      *
-     * @param table qualified table
+     * @param qualifiedTable qualified table
      * @param columnNames column names
      * @param uniqueKeys unique keys, it may be primary key, not null
      * @param shardingColumnsNames sharding columns names, nullable
      * @return built SQL
      */
-    public String buildPointQuerySQL(final QualifiedTable table, final 
Collection<String> columnNames, final List<String> uniqueKeys,
+    public String buildPointQuerySQL(final QualifiedTable qualifiedTable, 
final Collection<String> columnNames, final List<String> uniqueKeys,
                                      @Nullable final List<String> 
shardingColumnsNames) {
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(table.getSchemaName(), 
table.getTableName());
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable);
         String queryColumns = 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
         String equalsConditions = joinColumns(uniqueKeys, 
shardingColumnsNames).stream().map(each -> 
sqlSegmentBuilder.getEscapedIdentifier(each) + 
"=?").collect(Collectors.joining(" AND "));
         return String.format("SELECT %s FROM %s WHERE %s", queryColumns, 
qualifiedTableName, equalsConditions);
@@ -126,6 +126,6 @@ public final class 
PipelineDataConsistencyCalculateSQLBuilder {
      */
     public Optional<String> buildCRC32SQL(final QualifiedTable qualifiedTable, 
final String columnName) {
         return dialectSQLBuilder.buildCRC32SQL(
-                
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName()), 
sqlSegmentBuilder.getEscapedIdentifier(columnName));
+                sqlSegmentBuilder.getQualifiedTableName(qualifiedTable), 
sqlSegmentBuilder.getEscapedIdentifier(columnName));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
index 417ecff7e73..4c6b1d74fd7 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/segment/PipelineSQLSegmentBuilderTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment;
 
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
@@ -43,15 +44,18 @@ class PipelineSQLSegmentBuilderTest {
     @Test
     void assertGetQualifiedTableNameWithUnsupportedSchema() {
         assertThat(mySQLBuilder.getQualifiedTableName("foo_schema", 
"foo_tbl"), is("foo_tbl"));
+        assertThat(mySQLBuilder.getQualifiedTableName(new 
QualifiedTable("foo_schema", "foo_tbl")), is("foo_tbl"));
     }
     
     @Test
     void assertGetQualifiedTableNameWithSupportedSchema() {
         assertThat(postgreSQLBuilder.getQualifiedTableName("foo_schema", 
"foo_tbl"), is("foo_schema.foo_tbl"));
+        assertThat(postgreSQLBuilder.getQualifiedTableName(new 
QualifiedTable("foo_schema", "foo_tbl")), is("foo_schema.foo_tbl"));
     }
     
     @Test
     void assertGetQualifiedTableNameWithSupportedSchemaAndNullSchema() {
         assertThat(postgreSQLBuilder.getQualifiedTableName(null, "foo_tbl"), 
is("foo_tbl"));
+        assertThat(postgreSQLBuilder.getQualifiedTableName(new 
QualifiedTable(null, "foo_tbl")), is("foo_tbl"));
     }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 1dd4a33fc4c..f10f3e98f6b 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -87,13 +87,13 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
             String jobId = getJobIdByTableName(containerComposer, "ds_0.test." 
+ SOURCE_TABLE_NAME);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-            String schemaTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
-            containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName, 
new SnowflakeKeyGenerateAlgorithm(),
+            String qualifiedTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
+            containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName, 
new SnowflakeKeyGenerateAlgorithm(),
                     containerComposer.getDatabaseType(), 20));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
-            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
-            containerComposer.assertOrderRecordExist(jdbcDataSource, 
schemaTableName, 10000);
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
qualifiedTableName, 10000);
             checkOrderMigration(containerComposer, jobId);
             startMigrationWithSchema(containerComposer, "t_order_item", 
"t_order_item");
             checkOrderItemMigration(containerComposer);

Reply via email to