This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8d3a8823ddd Rename DataSourceCheckEngine.checkSourceDataSources()
(#29475)
8d3a8823ddd is described below
commit 8d3a8823ddd02ba5ac9545cd5e4a2e75a7982a9b
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 20:23:01 2023 +0800
Rename DataSourceCheckEngine.checkSourceDataSources() (#29475)
* Rename DataSourceCheckEngine.checkSourceDataSources()
* Refactor DataSourceCheckEngine
---
.../core/checker/DataSourceCheckEngine.java | 26 +++++++++++++---------
.../core/importer/ImporterConfiguration.java | 12 +++++++++-
.../migration/preparer/MigrationJobPreparer.java | 2 +-
3 files changed, 28 insertions(+), 12 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index e424525be4d..001a278fef2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.checker;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -68,7 +68,7 @@ public final class DataSourceCheckEngine {
*
* @param dataSources to be checked source data source
*/
- public void checkSourceDataSource(final Collection<DataSource>
dataSources) {
+ public void checkSourceDataSources(final Collection<DataSource>
dataSources) {
checkConnection(dataSources);
if (null == checker) {
return;
@@ -85,16 +85,14 @@ public final class DataSourceCheckEngine {
*/
public void checkTargetDataSources(final Collection<DataSource>
dataSources, final ImporterConfiguration importerConfig) {
checkConnection(dataSources);
- checkEmptyTable(dataSources,
importerConfig.getTableAndSchemaNameMapper(),
importerConfig.getLogicTableNames());
+ checkEmptyTable(dataSources, importerConfig);
}
- // TODO Merge schemaName and tableNames
- private void checkEmptyTable(final Collection<DataSource> dataSources,
final TableAndSchemaNameMapper tableAndSchemaNameMapper, final
Collection<String> logicTableNames) {
+ private void checkEmptyTable(final Collection<DataSource> dataSources,
final ImporterConfiguration importerConfig) {
try {
for (DataSource each : dataSources) {
- for (String tableName : logicTableNames) {
-
ShardingSpherePreconditions.checkState(checkEmptyTable(each,
tableAndSchemaNameMapper.getSchemaName(tableName), tableName),
- () -> new
PrepareJobWithTargetTableNotEmptyException(tableName));
+ for (CaseInsensitiveQualifiedTable qualifiedTable :
importerConfig.getQualifiedTables()) {
+
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable),
() -> new
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().toString()));
}
}
} catch (final SQLException ex) {
@@ -102,8 +100,16 @@ public final class DataSourceCheckEngine {
}
}
- private boolean checkEmptyTable(final DataSource dataSource, final String
schemaName, final String tableName) throws SQLException {
- String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName);
+ /**
+ * Check whether empty table.
+ *
+ * @param dataSource data source
+ * @param qualifiedTable qualified table
+ * @return empty or not
+ * @throws SQLException if there's database operation failure
+ */
+ public boolean checkEmptyTable(final DataSource dataSource, final
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
+ String sql =
sqlBuilder.buildCheckEmptySQL(qualifiedTable.getSchemaName().toString(),
qualifiedTable.getTableName().toString());
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index 0ce5730f582..ff1b0357286 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -21,9 +21,10 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -86,4 +87,13 @@ public final class ImporterConfiguration {
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
return dialectDatabaseMetaData.isSchemaAvailable() ?
Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) :
Optional.empty();
}
+
+ /**
+ * Get qualified tables.
+ *
+ * @return qualified tables
+ */
+ public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
+ return getLogicTableNames().stream().map(each -> new
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each),
each)).collect(Collectors.toList());
+ }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 9087424ba82..c30eeb3aab5 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -102,7 +102,7 @@ public final class MigrationJobPreparer {
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
() -> new UnsupportedSQLOperationException("Migration
inventory dumper only support StandardPipelineDataSourceConfiguration"));
DatabaseType sourceDatabaseType =
jobItemContext.getJobConfig().getSourceDatabaseType();
- new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
+ new
DataSourceCheckEngine(sourceDatabaseType).checkSourceDataSources(Collections.singleton(jobItemContext.getSourceDataSource()));
if (jobItemContext.isStopping()) {
PipelineJobRegistry.stop(jobItemContext.getJobId());
return;