This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d3a8823ddd Rename DataSourceCheckEngine.checkSourceDataSources() 
(#29475)
8d3a8823ddd is described below

commit 8d3a8823ddd02ba5ac9545cd5e4a2e75a7982a9b
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 20:23:01 2023 +0800

    Rename DataSourceCheckEngine.checkSourceDataSources() (#29475)
    
    * Rename DataSourceCheckEngine.checkSourceDataSources()
    
    * Refactor DataSourceCheckEngine
---
 .../core/checker/DataSourceCheckEngine.java        | 26 +++++++++++++---------
 .../core/importer/ImporterConfiguration.java       | 12 +++++++++-
 .../migration/preparer/MigrationJobPreparer.java   |  2 +-
 3 files changed, 28 insertions(+), 12 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index e424525be4d..001a278fef2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.checker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -68,7 +68,7 @@ public final class DataSourceCheckEngine {
      * 
      * @param dataSources to be checked source data source
      */
-    public void checkSourceDataSource(final Collection<DataSource> 
dataSources) {
+    public void checkSourceDataSources(final Collection<DataSource> 
dataSources) {
         checkConnection(dataSources);
         if (null == checker) {
             return;
@@ -85,16 +85,14 @@ public final class DataSourceCheckEngine {
      */
     public void checkTargetDataSources(final Collection<DataSource> 
dataSources, final ImporterConfiguration importerConfig) {
         checkConnection(dataSources);
-        checkEmptyTable(dataSources, 
importerConfig.getTableAndSchemaNameMapper(), 
importerConfig.getLogicTableNames());
+        checkEmptyTable(dataSources, importerConfig);
     }
     
-    // TODO Merge schemaName and tableNames
-    private void checkEmptyTable(final Collection<DataSource> dataSources, 
final TableAndSchemaNameMapper tableAndSchemaNameMapper, final 
Collection<String> logicTableNames) {
+    private void checkEmptyTable(final Collection<DataSource> dataSources, 
final ImporterConfiguration importerConfig) {
         try {
             for (DataSource each : dataSources) {
-                for (String tableName : logicTableNames) {
-                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, 
tableAndSchemaNameMapper.getSchemaName(tableName), tableName),
-                            () -> new 
PrepareJobWithTargetTableNotEmptyException(tableName));
+                for (CaseInsensitiveQualifiedTable qualifiedTable : 
importerConfig.getQualifiedTables()) {
+                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), 
() -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().toString()));
                 }
             }
         } catch (final SQLException ex) {
@@ -102,8 +100,16 @@ public final class DataSourceCheckEngine {
         }
     }
     
-    private boolean checkEmptyTable(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
-        String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName);
+    /**
+     * Check whether empty table.
+     *
+     * @param dataSource data source
+     * @param qualifiedTable qualified table
+     * @return empty or not
+     * @throws SQLException if there's database operation failure
+     */
+    public boolean checkEmptyTable(final DataSource dataSource, final 
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
+        String sql = 
sqlBuilder.buildCheckEmptySQL(qualifiedTable.getSchemaName().toString(), 
qualifiedTable.getTableName().toString());
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index 0ce5730f582..ff1b0357286 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -21,9 +21,10 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 
@@ -86,4 +87,13 @@ public final class ImporterConfiguration {
         DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
         return dialectDatabaseMetaData.isSchemaAvailable() ? 
Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : 
Optional.empty();
     }
+    
+    /**
+     * Get qualified tables.
+     * 
+     * @return qualified tables
+     */
+    public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
+        return getLogicTableNames().stream().map(each -> new 
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), 
each)).collect(Collectors.toList());
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 9087424ba82..c30eeb3aab5 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -102,7 +102,7 @@ public final class MigrationJobPreparer {
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
         DatabaseType sourceDatabaseType = 
jobItemContext.getJobConfig().getSourceDatabaseType();
-        new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
+        new 
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
             PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;

Reply via email to