This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0283ba2f3ea Refactor RecordSingleTableInventoryCalculator (#32711)
0283ba2f3ea is described below
commit 0283ba2f3ea4dd0fc99fe82006f6faeb6fdd908b
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 28 18:49:18 2024 +0800
Refactor RecordSingleTableInventoryCalculator (#32711)
---
.../RecordSingleTableInventoryCalculator.java | 26 ++++++++++------------
1 file changed, 12 insertions(+), 14 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 3051ebfb6e4..4a37f45b45d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -58,7 +58,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
@Override
public Optional<SingleTableInventoryCalculatedResult> calculateChunk(final
SingleTableInventoryCalculateParameter param) {
- List<Map<String, Object>> records = calculateChunk0(param,
QueryType.RANGE_QUERY == param.getQueryType());
+ List<Map<String, Object>> records = calculateChunk(param,
QueryType.RANGE_QUERY == param.getQueryType());
if (records.isEmpty()) {
return Optional.empty();
}
@@ -74,19 +74,18 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
return convertRecordsToResult(records, firstUniqueKey);
}
SingleTableInventoryCalculateParameter newParam =
buildNewCalculateParameter(param, minUniqueKeyValue);
- records = calculateChunk0(newParam, false);
+ records = calculateChunk(newParam, false);
if (!records.isEmpty()) {
updateQueryRangeLower(param, records, firstUniqueKey);
return convertRecordsToResult(records, firstUniqueKey);
}
return Optional.empty();
- } else {
- updateQueryRangeLower(param, records, firstUniqueKey);
- return convertRecordsToResult(records, firstUniqueKey);
}
+ updateQueryRangeLower(param, records, firstUniqueKey);
+ return convertRecordsToResult(records, firstUniqueKey);
}
- private List<Map<String, Object>> calculateChunk0(final
SingleTableInventoryCalculateParameter param, final boolean isRangeQuery) {
+ private List<Map<String, Object>> calculateChunk(final
SingleTableInventoryCalculateParameter param, final boolean isRangeQuery) {
try (CalculationContext calculationContext =
getOrCreateCalculationContext(param)) {
List<Map<String, Object>> result = new LinkedList<>();
InventoryColumnValueReaderEngine columnValueReaderEngine = new
InventoryColumnValueReaderEngine(param.getDatabaseType());
@@ -150,9 +149,8 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
}
private String getQuerySQL(final SingleTableInventoryCalculateParameter
param) {
- if (null == param.getFirstUniqueKey()) {
- throw new UnsupportedOperationException("Record inventory
calculator does not support table without unique key and primary key now");
- }
+ ShardingSpherePreconditions.checkNotNull(param.getFirstUniqueKey(),
+ () -> new UnsupportedOperationException("Record inventory
calculator does not support table without unique key and primary key now."));
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
Collection<String> columnNames = param.getColumnNames().isEmpty() ?
Collections.singleton("*") : param.getColumnNames();
switch (param.getQueryType()) {
@@ -170,8 +168,8 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
QueryType queryType = param.getQueryType();
if (queryType == QueryType.RANGE_QUERY) {
QueryRange queryRange = param.getQueryRange();
- ShardingSpherePreconditions.checkNotNull(queryRange, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(
- param.getSchemaName(), param.getLogicTableName(), new
RuntimeException("Unique keys values range is null")));
+ ShardingSpherePreconditions.checkNotNull(queryRange,
+ () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), new RuntimeException("Unique keys values range is
null.")));
int parameterIndex = 1;
if (null != queryRange.getLower()) {
preparedStatement.setObject(parameterIndex++,
queryRange.getLower());
@@ -182,8 +180,8 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
preparedStatement.setObject(parameterIndex, chunkSize);
} else if (queryType == QueryType.POINT_QUERY) {
Collection<Object> uniqueKeysValues = param.getUniqueKeysValues();
- ShardingSpherePreconditions.checkNotNull(uniqueKeysValues, () ->
new PipelineTableDataConsistencyCheckLoadingFailedException(
- param.getSchemaName(), param.getLogicTableName(), new
RuntimeException("Unique keys values is null")));
+ ShardingSpherePreconditions.checkNotNull(uniqueKeysValues,
+ () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName(), new RuntimeException("Unique keys values is
null.")));
int parameterIndex = 1;
for (Object each : uniqueKeysValues) {
preparedStatement.setObject(parameterIndex++, each);
@@ -191,7 +189,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
if (null != param.getShardingColumnsNames() &&
!param.getShardingColumnsNames().isEmpty()) {
List<Object> shardingColumnsValues =
param.getShardingColumnsValues();
ShardingSpherePreconditions.checkNotNull(shardingColumnsValues, () -> new
PipelineTableDataConsistencyCheckLoadingFailedException(
- param.getSchemaName(), param.getLogicTableName(), new
RuntimeException("Sharding columns values is null when names not empty")));
+ param.getSchemaName(), param.getLogicTableName(), new
RuntimeException("Sharding columns values is null when names not empty.")));
for (Object each : shardingColumnsValues) {
preparedStatement.setObject(parameterIndex++, each);
}