This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bc2c1e66e05 Refactor SingleTableInventoryCalculateParameter (#33955)
bc2c1e66e05 is described below
commit bc2c1e66e05dcaa9f38180499b427ca726b117b2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 19:56:31 2024 +0800
Refactor SingleTableInventoryCalculateParameter (#33955)
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
* Refactor SingleTableInventoryCalculateParameter
---
.../metadata/database/schema/QualifiedTable.java | 7 ++++++
.../checker/PipelineDataSourceCheckEngine.java | 10 ++++----
.../table/MatchingTableInventoryChecker.java | 8 +++----
.../table/TableInventoryCheckParameter.java | 6 ++---
.../CRC32SingleTableInventoryCalculator.java | 4 ++--
.../RecordSingleTableInventoryCalculator.java | 21 ++++++++--------
.../SingleTableInventoryCalculateParameter.java | 28 ++++------------------
...DataConsistencyCheckLoadingFailedException.java | 11 ++++-----
.../core/importer/ImporterConfiguration.java | 6 ++---
...PipelineDataConsistencyCalculateSQLBuilder.java | 20 ++++++++--------
.../checker/PipelineDataSourceCheckEngineTest.java | 6 ++---
.../CRC32SingleTableInventoryCalculatorTest.java | 9 ++++---
.../core/importer/ImporterConfigurationTest.java | 9 ++++---
...lineDataConsistencyCalculateSQLBuilderTest.java | 13 +++++-----
.../MigrationDataConsistencyChecker.java | 10 ++++----
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 22 ++++++++---------
.../RecordSingleTableInventoryCalculatorTest.java | 18 +++++++-------
17 files changed, 98 insertions(+), 110 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
index 3276b5e3c48..6ea51c77116 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.metadata.database.schema;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -25,9 +26,15 @@ import lombok.RequiredArgsConstructor;
*/
@RequiredArgsConstructor
@Getter
+@EqualsAndHashCode
public final class QualifiedTable {
private final String schemaName;
private final String tableName;
+
+ @Override
+ public String toString() {
+ return null == schemaName ? tableName : String.join(".", schemaName,
tableName);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 03bfdfafe9c..7440198ea08 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -90,8 +90,8 @@ public final class PipelineDataSourceCheckEngine {
private void checkEmptyTable(final Collection<DataSource> dataSources,
final ImporterConfiguration importerConfig) {
try {
for (DataSource each : dataSources) {
- for (CaseInsensitiveQualifiedTable qualifiedTable :
importerConfig.getQualifiedTables()) {
-
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable),
() -> new
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().getValue()));
+ for (QualifiedTable qualifiedTable :
importerConfig.getQualifiedTables()) {
+
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable),
() -> new
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName()));
}
}
} catch (final SQLException ex) {
@@ -107,8 +107,8 @@ public final class PipelineDataSourceCheckEngine {
* @return empty or not
* @throws SQLException if there's database operation failure
*/
- public boolean checkEmptyTable(final DataSource dataSource, final
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
- String sql =
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().getValue(),
qualifiedTable.getTableName().getValue());
+ public boolean checkEmptyTable(final DataSource dataSource, final
QualifiedTable qualifiedTable) throws SQLException {
+ String sql =
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName());
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index b26358e8b50..690ce1e89b0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -73,9 +73,9 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final
ThreadPoolExecutor executor) {
SingleTableInventoryCalculateParameter sourceParam = new
SingleTableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
- param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getValue()));
+ param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()));
SingleTableInventoryCalculateParameter targetParam = new
SingleTableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
- param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getValue()));
+ param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()));
SingleTableInventoryCalculator sourceCalculator =
buildSingleTableInventoryCalculator();
this.sourceCalculator = sourceCalculator;
SingleTableInventoryCalculator targetCalculator =
buildSingleTableInventoryCalculator();
@@ -108,10 +108,10 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
break;
}
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getValue(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
+
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getValue(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
+
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
}
param.getProgressContext().onProgressUpdated(new
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 8c5d674eeb2..4419b643093 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import java.util.List;
@@ -40,9 +40,9 @@ public final class TableInventoryCheckParameter {
private final PipelineDataSource targetDataSource;
- private final CaseInsensitiveQualifiedTable sourceTable;
+ private final QualifiedTable sourceTable;
- private final CaseInsensitiveQualifiedTable targetTable;
+ private final QualifiedTable targetTable;
private final List<String> columnNames;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
index e967f236345..dc198b0c04d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
@@ -49,7 +49,7 @@ public final class CRC32SingleTableInventoryCalculator
extends AbstractSingleTab
}
private CalculatedItem calculateCRC32(final
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final
SingleTableInventoryCalculateParameter param, final String columnName) {
- String sql = pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(),
param.getLogicTableName(), columnName)
+ String sql = pipelineSQLBuilder.buildCRC32SQL(param.getTable(),
columnName)
.orElseThrow(() -> new
UnsupportedAlgorithmOnDatabaseTypeException("DataConsistencyCalculate",
"CRC32", param.getDatabaseType()));
try (
Connection connection = param.getDataSource().getConnection();
@@ -61,7 +61,7 @@ public final class CRC32SingleTableInventoryCalculator
extends AbstractSingleTab
int recordsCount = resultSet.getInt(2);
return new CalculatedItem(crc32, recordsCount);
} catch (final SQLException ex) {
- throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), ex);
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), ex);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 0ad0862d142..f38a17ba1bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -92,8 +92,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
ResultSet resultSet = calculationContext.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
- ShardingSpherePreconditions.checkState(!isCanceling(), () ->
new PipelineJobCancelingException(
- "Calculate chunk canceled, schema name: %s, table
name: %s", param.getSchemaName(), param.getLogicTableName()));
+ ShardingSpherePreconditions.checkState(!isCanceling(), () ->
new PipelineJobCancelingException("Calculate chunk canceled, qualified table:
%s", param.getTable()));
Map<String, Object> columnRecord = new LinkedHashMap<>();
for (int columnIndex = 1, columnCount =
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
columnRecord.put(resultSetMetaData.getColumnLabel(columnIndex),
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
@@ -109,7 +108,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
- throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), ex);
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), ex);
}
}
@@ -125,7 +124,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
} catch (final SQLException | RuntimeException ex) {
// CHECKSTYLE:ON
QuietlyCloser.close(result);
- throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), ex);
+ throw new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), ex);
}
return result;
}
@@ -155,10 +154,10 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
Collection<String> columnNames = param.getColumnNames().isEmpty() ?
Collections.singleton("*") : param.getColumnNames();
switch (param.getQueryType()) {
case RANGE_QUERY:
- return
pipelineSQLBuilder.buildQueryRangeOrderingSQL(param.getSchemaName(),
param.getLogicTableName(),
- columnNames, param.getUniqueKeysNames(),
param.getQueryRange(), param.getShardingColumnsNames());
+ return
pipelineSQLBuilder.buildQueryRangeOrderingSQL(param.getTable(), columnNames,
param.getUniqueKeysNames(), param.getQueryRange(),
param.getShardingColumnsNames());
case POINT_QUERY:
- return
pipelineSQLBuilder.buildPointQuerySQL(param.getSchemaName(),
param.getLogicTableName(), columnNames, param.getUniqueKeysNames(),
param.getShardingColumnsNames());
+ return pipelineSQLBuilder.buildPointQuerySQL(
+ param.getTable().getSchemaName(),
param.getTable().getTableName(), columnNames, param.getUniqueKeysNames(),
param.getShardingColumnsNames());
default:
throw new UnsupportedOperationException("Query type: " +
param.getQueryType());
}
@@ -169,7 +168,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
if (queryType == QueryType.RANGE_QUERY) {
QueryRange queryRange = param.getQueryRange();
ShardingSpherePreconditions.checkNotNull(queryRange,
- () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), new RuntimeException("Unique keys values range is
null.")));
+ () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new
RuntimeException("Unique keys values range is null.")));
int parameterIndex = 1;
if (null != queryRange.getLower()) {
preparedStatement.setObject(parameterIndex++,
queryRange.getLower());
@@ -181,15 +180,15 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
} else if (queryType == QueryType.POINT_QUERY) {
Collection<Object> uniqueKeysValues = param.getUniqueKeysValues();
ShardingSpherePreconditions.checkNotNull(uniqueKeysValues,
- () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), new RuntimeException("Unique keys values is
null.")));
+ () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new
RuntimeException("Unique keys values is null.")));
int parameterIndex = 1;
for (Object each : uniqueKeysValues) {
preparedStatement.setObject(parameterIndex++, each);
}
if (null != param.getShardingColumnsNames() &&
!param.getShardingColumnsNames().isEmpty()) {
List<Object> shardingColumnsValues =
param.getShardingColumnsValues();
-
ShardingSpherePreconditions.checkNotNull(shardingColumnsValues, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(
- param.getSchemaName(), param.getLogicTableName(), new
RuntimeException("Sharding columns values is null when names not empty.")));
+ ShardingSpherePreconditions.checkNotNull(shardingColumnsValues,
+ () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new
RuntimeException("Sharding columns values is null when names not empty.")));
for (Object each : shardingColumnsValues) {
preparedStatement.setObject(parameterIndex++, each);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 62d34286768..8b75761e472 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -20,11 +20,11 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import javax.annotation.Nullable;
import java.util.Collection;
@@ -45,7 +45,7 @@ public final class SingleTableInventoryCalculateParameter {
*/
private final PipelineDataSource dataSource;
- private final CaseInsensitiveQualifiedTable table;
+ private final QualifiedTable table;
private final List<String> columnNames;
@@ -67,8 +67,8 @@ public final class SingleTableInventoryCalculateParameter {
private final QueryType queryType;
- public SingleTableInventoryCalculateParameter(final PipelineDataSource
dataSource, final CaseInsensitiveQualifiedTable table, final List<String>
columnNames,
- final
List<PipelineColumnMetaData> uniqueKeys, final Object tableCheckPosition) {
+ public SingleTableInventoryCalculateParameter(final PipelineDataSource
dataSource,
+ final QualifiedTable table,
final List<String> columnNames, final List<PipelineColumnMetaData> uniqueKeys,
final Object tableCheckPosition) {
this.dataSource = dataSource;
this.table = table;
this.columnNames = columnNames;
@@ -86,24 +86,6 @@ public final class SingleTableInventoryCalculateParameter {
return dataSource.getDatabaseType();
}
- /**
- * Get schema name.
- *
- * @return schema name
- */
- public String getSchemaName() {
- return table.getSchemaName().getValue();
- }
-
- /**
- * Get logic table name.
- *
- * @return logic table name
- */
- public String getLogicTableName() {
- return table.getTableName().getValue();
- }
-
/**
* Get first unique key.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
index 6641476adf8..6dee838a265 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
@@ -18,8 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.exception.data;
import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
-
-import javax.annotation.Nullable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
/**
* Pipeline table data consistency check loading failed exception.
@@ -28,11 +27,11 @@ public final class
PipelineTableDataConsistencyCheckLoadingFailedException exten
private static final long serialVersionUID = 8965231249677009738L;
- public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable
final String schemaName, final String tableName) {
- this(schemaName, tableName, null);
+ public PipelineTableDataConsistencyCheckLoadingFailedException(final
QualifiedTable qualifiedTable) {
+ this(qualifiedTable, null);
}
- public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable
final String schemaName, final String tableName, final Exception cause) {
- super(XOpenSQLState.CONNECTION_EXCEPTION, 1, String.format("Data check
table '%s' failed.", null != schemaName ? schemaName + "." + tableName :
tableName), cause);
+ public PipelineTableDataConsistencyCheckLoadingFailedException(final
QualifiedTable qualifiedTable, final Exception cause) {
+ super(XOpenSQLState.CONNECTION_EXCEPTION, 1, String.format("Data check
table '%s' failed.", qualifiedTable), cause);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index bb0ebca58cd..00f8efaa8ec 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAn
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.util.Collection;
@@ -85,8 +85,8 @@ public final class ImporterConfiguration {
*
* @return qualified tables
*/
- public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
+ public Collection<QualifiedTable> getQualifiedTables() {
return shardingColumnsMap.keySet().stream()
- .map(ShardingSphereIdentifier::getValue).map(each -> new
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each),
each)).collect(Collectors.toList());
+ .map(ShardingSphereIdentifier::getValue).map(each -> new
QualifiedTable(tableAndSchemaNameMapper.getSchemaName(each),
each)).collect(Collectors.toList());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index c5a51e3f570..c3edc165fe1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPi
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -47,22 +48,21 @@ public final class
PipelineDataConsistencyCalculateSQLBuilder {
/**
* Build query range ordering SQL.
*
- * @param schemaName schema name
- * @param tableName table name
+ * @param qualifiedTable qualified table
* @param columnNames column names
* @param uniqueKeys unique keys, it may be primary key, not null
* @param queryRange query range
* @param shardingColumnsNames sharding columns names
* @return built SQL
*/
- public String buildQueryRangeOrderingSQL(final String schemaName, final
String tableName, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
+ public String buildQueryRangeOrderingSQL(final QualifiedTable
qualifiedTable, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
@Nullable final List<String>
shardingColumnsNames) {
- return
dialectSQLBuilder.wrapWithPageQuery(buildQueryRangeOrderingSQL0(schemaName,
tableName, columnNames, uniqueKeys, queryRange, shardingColumnsNames));
+ return
dialectSQLBuilder.wrapWithPageQuery(buildQueryRangeOrderingSQL0(qualifiedTable,
columnNames, uniqueKeys, queryRange, shardingColumnsNames));
}
- private String buildQueryRangeOrderingSQL0(final String schemaName, final
String tableName, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
+ private String buildQueryRangeOrderingSQL0(final QualifiedTable
qualifiedTable, final Collection<String> columnNames, final List<String>
uniqueKeys, final QueryRange queryRange,
@Nullable final List<String>
shardingColumnsNames) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+ String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName());
String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
String firstUniqueKey = uniqueKeys.get(0);
String orderByColumns = joinColumns(uniqueKeys,
shardingColumnsNames).stream().map(each ->
sqlSegmentBuilder.getEscapedIdentifier(each) + "
ASC").collect(Collectors.joining(", "));
@@ -121,12 +121,12 @@ public final class
PipelineDataConsistencyCalculateSQLBuilder {
/**
* Build CRC32 SQL.
*
- * @param schemaName schema name
- * @param tableName table name
+ * @param qualifiedTable qualified table
* @param columnName column name
* @return built SQL
*/
- public Optional<String> buildCRC32SQL(final String schemaName, final
String tableName, final String columnName) {
- return
dialectSQLBuilder.buildCRC32SQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
tableName), sqlSegmentBuilder.getEscapedIdentifier(columnName));
+ public Optional<String> buildCRC32SQL(final QualifiedTable qualifiedTable,
final String columnName) {
+ return dialectSQLBuilder.buildCRC32SQL(
+
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName()),
sqlSegmentBuilder.getEscapedIdentifier(columnName));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
index 1c513940779..7637ad634da 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -96,7 +96,7 @@ class PipelineDataSourceCheckEngineTest {
when(connection.prepareStatement("SELECT * FROM t_order LIMIT
1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class);
-
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new
CaseInsensitiveQualifiedTable(null, "t_order")));
+
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new
QualifiedTable(null, "t_order")));
assertDoesNotThrow(() ->
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig));
}
@@ -107,7 +107,7 @@ class PipelineDataSourceCheckEngineTest {
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
ImporterConfiguration importerConfig =
mock(ImporterConfiguration.class);
-
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new
CaseInsensitiveQualifiedTable(null, "t_order")));
+
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new
QualifiedTable(null, "t_order")));
assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () ->
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources,
importerConfig));
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
index 61f6c800bd0..c23537e161d 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
@@ -17,12 +17,12 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -65,8 +65,7 @@ class CRC32SingleTableInventoryCalculatorTest {
void setUp() throws SQLException {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER,
"integer", false, true, true));
- parameter = new
SingleTableInventoryCalculateParameter(pipelineDataSource,
- new CaseInsensitiveQualifiedTable(null, "foo_tbl"),
Arrays.asList("foo_col", "bar_col"), uniqueKeys, Collections.emptyMap());
+ parameter = new
SingleTableInventoryCalculateParameter(pipelineDataSource, new
QualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"),
uniqueKeys, Collections.emptyMap());
when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType);
when(pipelineDataSource.getConnection()).thenReturn(connection);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
index d0bbb7024aa..321884ab03d 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
@@ -56,9 +56,8 @@ class ImporterConfigurationTest {
void assertGetQualifiedTables() {
TableAndSchemaNameMapper tableAndSchemaNameMapper =
mock(TableAndSchemaNameMapper.class);
when(tableAndSchemaNameMapper.getSchemaName("foo_tbl")).thenReturn("foo_schema");
- ImporterConfiguration importerConfig = new ImporterConfiguration(
- mock(PipelineDataSourceConfiguration.class),
Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"),
Collections.singleton("foo_col")),
- tableAndSchemaNameMapper, 1,
mock(JobRateLimitAlgorithm.class), 1, 1);
- assertThat(importerConfig.getQualifiedTables(),
is(Collections.singletonList(new CaseInsensitiveQualifiedTable("foo_schema",
"foo_tbl"))));
+ ImporterConfiguration importerConfig = new
ImporterConfiguration(mock(PipelineDataSourceConfiguration.class),
+ Collections.singletonMap(new
ShardingSphereIdentifier("foo_tbl"), Collections.singleton("foo_col")),
tableAndSchemaNameMapper, 1, mock(JobRateLimitAlgorithm.class), 1, 1);
+ assertThat(importerConfig.getQualifiedTables(),
is(Collections.singletonList(new QualifiedTable("foo_schema", "foo_tbl"))));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
index 352b1f1cd96..06ab86fae9f 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
@@ -43,19 +44,19 @@ class PipelineDataConsistencyCalculateSQLBuilderTest {
@Test
void assertBuildQueryRangeOrderingSQLWithoutQueryCondition() {
- String actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order",
COLUMN_NAMES, UNIQUE_KEYS,
+ String actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
new QueryRange(1, true, 5), SHARDING_COLUMNS_NAMES);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id
ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order",
COLUMN_NAMES, UNIQUE_KEYS,
+ actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
new QueryRange(1, false, 5), SHARDING_COLUMNS_NAMES);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id
ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order",
COLUMN_NAMES, UNIQUE_KEYS,
+ actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
new QueryRange(1, false, null), SHARDING_COLUMNS_NAMES);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? ORDER BY order_id ASC, status ASC, user_id ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order",
COLUMN_NAMES, UNIQUE_KEYS,
+ actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
new QueryRange(null, false, 5), SHARDING_COLUMNS_NAMES);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC"));
- actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order",
COLUMN_NAMES, UNIQUE_KEYS,
+ actual = sqlBuilder.buildQueryRangeOrderingSQL(new
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
new QueryRange(null, false, null), SHARDING_COLUMNS_NAMES);
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC, status ASC, user_id ASC"));
}
@@ -72,7 +73,7 @@ class PipelineDataConsistencyCalculateSQLBuilderTest {
@Test
void assertBuildCRC32SQL() {
- Optional<String> actual = sqlBuilder.buildCRC32SQL("foo_schema",
"foo_tbl", "foo_col");
+ Optional<String> actual = sqlBuilder.buildCRC32SQL(new
QualifiedTable("foo_schema", "foo_tbl"), "foo_col");
assertThat(actual, is(Optional.of("SELECT CRC32(foo_col) FROM
foo_tbl")));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index b8722ff9ca3..dc1ab7df67e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -124,15 +125,16 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final String targetTableName, final DataNode
dataNode,
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager
dataSourceManager) {
- CaseInsensitiveQualifiedTable sourceTable = new
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(),
dataNode.getTableName());
- CaseInsensitiveQualifiedTable targetTable = new
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), targetTableName);
+ QualifiedTable sourceTable = new
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
+ QualifiedTable targetTable = new
QualifiedTable(dataNode.getSchemaName(), targetTableName);
PipelineDataSource sourceDataSource =
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
PipelineDataSource targetDataSource =
dataSourceManager.getDataSource(jobConfig.getTarget());
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(dataNode.getSchemaName(),
dataNode.getTableName());
- ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(),
dataNode.getTableName()));
+ ShardingSpherePreconditions.checkNotNull(tableMetaData,
+ () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(new
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName())));
List<String> columnNames = tableMetaData.getColumnNames();
- List<PipelineColumnMetaData> uniqueKeys =
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName().getValue(),
sourceTable.getTableName().getValue(), metaDataLoader);
+ List<PipelineColumnMetaData> uniqueKeys =
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName(),
sourceTable.getTableName(), metaDataLoader);
TableInventoryCheckParameter param = new TableInventoryCheckParameter(
jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm,
progressContext);
TableInventoryChecker tableInventoryChecker =
tableChecker.buildTableInventoryChecker(param);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 0cd02fe9260..8297becec5e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -32,15 +32,15 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
@@ -133,12 +133,12 @@ class CDCE2EIT {
containerComposer.proxyExecuteWithLog(String.format("INSERT
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId,
i), 0);
containerComposer.assertOrderRecordExist(targetDataSource,
tableName, orderId);
}
- CaseInsensitiveQualifiedTable orderSchemaTableName =
dialectDatabaseMetaData.isSchemaAvailable()
- ? new
CaseInsensitiveQualifiedTable(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME)
- : new CaseInsensitiveQualifiedTable(null,
SOURCE_TABLE_NAME);
- assertDataMatched(sourceDataSource, targetDataSource,
orderSchemaTableName);
- assertDataMatched(sourceDataSource, targetDataSource, new
CaseInsensitiveQualifiedTable(null, "t_address"));
- assertDataMatched(sourceDataSource, targetDataSource, new
CaseInsensitiveQualifiedTable(null, "t_single"));
+ QualifiedTable orderQualifiedTable =
dialectDatabaseMetaData.isSchemaAvailable()
+ ? new
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
+ : new QualifiedTable(null, SOURCE_TABLE_NAME);
+ assertDataMatched(sourceDataSource, targetDataSource,
orderQualifiedTable);
+ assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_address"));
+ assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_single"));
cdcClient.close();
Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() ->
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
.stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
@@ -186,12 +186,12 @@ class CDCE2EIT {
return result;
}
- private void assertDataMatched(final PipelineDataSource sourceDataSource,
final PipelineDataSource targetDataSource, final CaseInsensitiveQualifiedTable
schemaTableName) {
+ private void assertDataMatched(final PipelineDataSource sourceDataSource,
final PipelineDataSource targetDataSource, final QualifiedTable qualifiedTable)
{
StandardPipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(targetDataSource);
- PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getValue(),
schemaTableName.getTableName().getValue());
+ PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName());
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0,
sourceDataSource.getDatabaseType().getType());
- TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
schemaTableName, schemaTableName,
+ TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
qualifiedTable, qualifiedTable,
tableMetaData.getColumnNames(), uniqueKeys, null,
progressContext);
TableDataConsistencyChecker tableChecker =
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new
Properties());
TableDataConsistencyCheckResult checkResult =
tableChecker.buildTableInventoryChecker(param).checkSingleTableInventoryData();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index ac74e3ac676..f0956e547a2 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -22,11 +22,11 @@ import org.apache.commons.lang3.RandomStringUtils;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -123,14 +123,14 @@ class RecordSingleTableInventoryCalculatorTest {
private SingleTableInventoryCalculateParameter generateParameter(final
PipelineDataSource dataSource, final Object dataCheckPosition) {
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(new PipelineColumnMetaData(1, "order_id",
Types.INTEGER, "integer", false, true, true));
- return new SingleTableInventoryCalculateParameter(dataSource, new
CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(),
uniqueKeys, dataCheckPosition);
+ return new SingleTableInventoryCalculateParameter(dataSource, new
QualifiedTable(null, "t_order"), Collections.emptyList(), uniqueKeys,
dataCheckPosition);
}
@Test
void assertCalculateOfRangeQuery() {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000);
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new
CaseInsensitiveQualifiedTable(null, "t_order"),
- Collections.emptyList(), buildUniqueKeys(),
QueryType.RANGE_QUERY);
+ SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(
+ dataSource, new QualifiedTable(null, "t_order"),
Collections.emptyList(), buildUniqueKeys(), QueryType.RANGE_QUERY);
param.setQueryRange(new QueryRange(3, true, 7));
Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
assertTrue(calculatedResult.isPresent());
@@ -143,8 +143,8 @@ class RecordSingleTableInventoryCalculatorTest {
@Test
void assertCalculateOfRangeQueryAll() {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3);
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new
CaseInsensitiveQualifiedTable(null, "t_order"),
- Collections.emptyList(), buildUniqueKeys(),
QueryType.RANGE_QUERY);
+ SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource,
+ new QualifiedTable(null, "t_order"), Collections.emptyList(),
buildUniqueKeys(), QueryType.RANGE_QUERY);
param.setQueryRange(new QueryRange(null, false, null));
Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -183,8 +183,8 @@ class RecordSingleTableInventoryCalculatorTest {
@Test
void assertCalculateOfPointQuery() {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3);
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new
CaseInsensitiveQualifiedTable(null, "t_order"),
- Collections.emptyList(), buildUniqueKeys(),
QueryType.POINT_QUERY);
+ SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource,
+ new QualifiedTable(null, "t_order"), Collections.emptyList(),
buildUniqueKeys(), QueryType.POINT_QUERY);
param.setUniqueKeysValues(Arrays.asList(3, 3));
Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
assertTrue(calculatedResult.isPresent());