This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ac1437b5470 Remove 'Single' in pipeline inventory calculator classes
(#36835)
ac1437b5470 is described below
commit ac1437b547021b3faf98ecd790673a0afe186087
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Oct 10 11:36:35 2025 +0800
Remove 'Single' in pipeline inventory calculator classes (#36835)
Rename SingleTableInventoryCalculator to TableInventoryCalculator;
Rename SingleTableInventoryCalculatedResult to
TableInventoryCheckCalculatedResult
---
...RecordTableInventoryCheckCalculatedResult.java} | 10 +-
...va => TableInventoryCheckCalculatedResult.java} | 4 +-
.../CRC32MatchTableDataConsistencyChecker.java | 10 +-
.../DataMatchTableDataConsistencyChecker.java | 10 +-
.../table/MatchingTableInventoryChecker.java | 36 +++----
...ava => CRC32TableInventoryCheckCalculator.java} | 16 +--
...va => RecordTableInventoryCheckCalculator.java} | 18 ++--
.../ingest/dumper/inventory/InventoryDumper.java | 12 +--
...=> AbstractRecordTableInventoryCalculator.java} | 36 +++----
...AbstractStreamingTableInventoryCalculator.java} | 12 +--
....java => AbstractTableInventoryCalculator.java} | 4 +-
....java => TableInventoryCalculateParameter.java} | 4 +-
...lculator.java => TableInventoryCalculator.java} | 6 +-
...rdTableInventoryCheckCalculatedResultTest.java} | 36 +++----
...=> CRC32TableInventoryCheckCalculatorTest.java} | 14 +--
...> RecordTableInventoryCheckCalculatorTest.java} | 116 ++++++++++-----------
16 files changed, 172 insertions(+), 172 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResult.java
similarity index 88%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResult.java
index 9a92206ef9e..3341ad97de2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResult.java
@@ -29,11 +29,11 @@ import java.util.Map;
import java.util.Optional;
/**
- * Record single table inventory calculated result.
+ * Record table inventory calculated result.
*/
@Getter
@Slf4j
-public final class RecordSingleTableInventoryCalculatedResult implements
SingleTableInventoryCalculatedResult {
+public final class RecordTableInventoryCheckCalculatedResult implements
TableInventoryCheckCalculatedResult {
private final Object maxUniqueKeyValue;
@@ -41,7 +41,7 @@ public final class RecordSingleTableInventoryCalculatedResult
implements SingleT
private final List<Map<String, Object>> records;
- public RecordSingleTableInventoryCalculatedResult(final Object
maxUniqueKeyValue, final List<Map<String, Object>> records) {
+ public RecordTableInventoryCheckCalculatedResult(final Object
maxUniqueKeyValue, final List<Map<String, Object>> records) {
this.maxUniqueKeyValue = maxUniqueKeyValue;
recordsCount = records.size();
this.records = records;
@@ -60,11 +60,11 @@ public final class
RecordSingleTableInventoryCalculatedResult implements SingleT
if (this == o) {
return true;
}
- if (!(o instanceof RecordSingleTableInventoryCalculatedResult)) {
+ if (!(o instanceof RecordTableInventoryCheckCalculatedResult)) {
log.warn("RecordSingleTableInventoryCalculatedResult type not
match, o.className={}.", o.getClass().getName());
return false;
}
- final RecordSingleTableInventoryCalculatedResult that =
(RecordSingleTableInventoryCalculatedResult) o;
+ final RecordTableInventoryCheckCalculatedResult that =
(RecordTableInventoryCheckCalculatedResult) o;
EqualsBuilder equalsBuilder = new EqualsBuilder();
if (recordsCount != that.recordsCount ||
!DataConsistencyCheckUtils.isMatched(equalsBuilder, maxUniqueKeyValue,
that.maxUniqueKeyValue)) {
log.warn("Record count or max unique key value not match,
recordCount1={}, recordCount2={}, maxUniqueKeyValue1={}, maxUniqueKeyValue2={},
value1.class={}, value2.class={}.",
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/SingleTableInventoryCalculatedResult.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableInventoryCheckCalculatedResult.java
similarity index 92%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/SingleTableInventoryCalculatedResult.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableInventoryCheckCalculatedResult.java
index 69c9ca84085..8989ef14ea0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/SingleTableInventoryCalculatedResult.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/TableInventoryCheckCalculatedResult.java
@@ -20,9 +20,9 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result;
import java.util.Optional;
/**
- * Single table inventory calculated result.
+ * Table inventory check calculated result.
*/
-public interface SingleTableInventoryCalculatedResult {
+public interface TableInventoryCheckCalculatedResult {
/**
* Get max unique key value.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index 8303a34a2b1..7e2c209fdf7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.CRC32SingleTableInventoryCheckCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.CRC32TableInventoryCheckCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculator;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -64,8 +64,8 @@ public final class CRC32MatchTableDataConsistencyChecker
implements TableDataCon
}
@Override
- protected
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator() {
- return new CRC32SingleTableInventoryCheckCalculator();
+ protected
TableInventoryCalculator<TableInventoryCheckCalculatedResult>
buildSingleTableInventoryCalculator() {
+ return new CRC32TableInventoryCheckCalculator();
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 334da478d31..c0ba06806b1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
import com.google.common.base.Strings;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCheckCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordTableInventoryCheckCalculator;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculator;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -128,8 +128,8 @@ public final class DataMatchTableDataConsistencyChecker
implements TableDataCons
}
@Override
- protected
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator() {
- return new RecordSingleTableInventoryCheckCalculator(chunkSize,
streamingRangeType);
+ protected
TableInventoryCalculator<TableInventoryCheckCalculatedResult>
buildSingleTableInventoryCalculator() {
+ return new RecordTableInventoryCheckCalculator(chunkSize,
streamingRangeType);
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index b28699b2ca7..71857630204 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -22,15 +22,15 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -57,9 +57,9 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private final AtomicBoolean canceling = new AtomicBoolean(false);
- private volatile
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
sourceCalculator;
+ private volatile
TableInventoryCalculator<TableInventoryCheckCalculatedResult> sourceCalculator;
- private volatile
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
targetCalculator;
+ private volatile
TableInventoryCalculator<TableInventoryCheckCalculatedResult> targetCalculator;
@Override
public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
@@ -74,22 +74,22 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
}
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final
ThreadPoolExecutor executor) {
- SingleTableInventoryCalculateParameter sourceParam = new
SingleTableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
+ TableInventoryCalculateParameter sourceParam = new
TableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
TableCheckRangePosition checkRangePosition =
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
sourceParam.setQueryRange(new QueryRange(null !=
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition()
: checkRangePosition.getSourceRange().getBeginValue(),
true, checkRangePosition.getSourceRange().getEndValue()));
- SingleTableInventoryCalculateParameter targetParam = new
SingleTableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
+ TableInventoryCalculateParameter targetParam = new
TableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
targetParam.setQueryRange(new QueryRange(null !=
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition()
: checkRangePosition.getTargetRange().getBeginValue(),
true, checkRangePosition.getTargetRange().getEndValue()));
- SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
sourceCalculator = buildSingleTableInventoryCalculator();
+ TableInventoryCalculator<TableInventoryCheckCalculatedResult>
sourceCalculator = buildSingleTableInventoryCalculator();
this.sourceCalculator = sourceCalculator;
- SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
targetCalculator = buildSingleTableInventoryCalculator();
+ TableInventoryCalculator<TableInventoryCheckCalculatedResult>
targetCalculator = buildSingleTableInventoryCalculator();
this.targetCalculator = targetCalculator;
try {
- Iterator<SingleTableInventoryCalculatedResult>
sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
- Iterator<SingleTableInventoryCalculatedResult>
targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
+ Iterator<TableInventoryCheckCalculatedResult>
sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
+ Iterator<TableInventoryCheckCalculatedResult>
targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
return checkSingleTableInventoryData(sourceCalculatedResults,
targetCalculatedResults, param, executor);
} finally {
QuietlyCloser.close(sourceParam.getCalculationContext());
@@ -99,16 +99,16 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
}
}
- private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
-
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
+ private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final
Iterator<TableInventoryCheckCalculatedResult> sourceCalculatedResults,
+
final Iterator<TableInventoryCheckCalculatedResult> targetCalculatedResults,
final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
YamlTableDataConsistencyCheckResult checkResult = new
YamlTableDataConsistencyCheckResult(true);
while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
if (null != param.getReadRateLimitAlgorithm()) {
param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
}
- SingleTableInventoryCalculatedResult sourceCalculatedResult =
PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
- SingleTableInventoryCalculatedResult targetCalculatedResult =
PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
+ TableInventoryCheckCalculatedResult sourceCalculatedResult =
PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
+ TableInventoryCheckCalculatedResult targetCalculatedResult =
PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
if (!Objects.equals(sourceCalculatedResult,
targetCalculatedResult)) {
checkResult.setMatched(false);
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(),
param.getTargetTable(), param.getUniqueKeys());
@@ -132,13 +132,13 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
- protected abstract
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator();
+ protected abstract
TableInventoryCalculator<TableInventoryCheckCalculatedResult>
buildSingleTableInventoryCalculator();
@Override
public void cancel() {
canceling.set(true);
-
Optional.ofNullable(sourceCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
-
Optional.ofNullable(targetCalculator).ifPresent(SingleTableInventoryCalculator::cancel);
+
Optional.ofNullable(sourceCalculator).ifPresent(TableInventoryCalculator::cancel);
+
Optional.ofNullable(targetCalculator).ifPresent(TableInventoryCalculator::cancel);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculator.java
similarity index 87%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculator.java
index 302f9b3d63e..dd8d624f03a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculator.java
@@ -20,10 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractSingleTableInventoryCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineDataConsistencyCalculateSQLBuilder;
import
org.apache.shardingsphere.infra.algorithm.core.exception.UnsupportedAlgorithmOnDatabaseTypeException;
@@ -38,19 +38,19 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * CRC32 single table inventory check calculator.
+ * CRC32 table inventory check calculator.
*/
@Slf4j
-public final class CRC32SingleTableInventoryCheckCalculator extends
AbstractSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult> {
+public final class CRC32TableInventoryCheckCalculator extends
AbstractTableInventoryCalculator<TableInventoryCheckCalculatedResult> {
@Override
- public Iterable<SingleTableInventoryCalculatedResult> calculate(final
SingleTableInventoryCalculateParameter param) {
+ public Iterable<TableInventoryCheckCalculatedResult> calculate(final
TableInventoryCalculateParameter param) {
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(pipelineSQLBuilder,
param, each)).collect(Collectors.toList());
return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
- private CalculatedItem calculateCRC32(final
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final
SingleTableInventoryCalculateParameter param, final String columnName) {
+ private CalculatedItem calculateCRC32(final
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final
TableInventoryCalculateParameter param, final String columnName) {
String sql = pipelineSQLBuilder.buildCRC32SQL(param.getTable(),
columnName)
.orElseThrow(() -> new
UnsupportedAlgorithmOnDatabaseTypeException("DataConsistencyCalculate",
"CRC32", param.getDatabaseType()));
try (
@@ -79,7 +79,7 @@ public final class CRC32SingleTableInventoryCheckCalculator
extends AbstractSing
@RequiredArgsConstructor
@Getter
- private static final class CalculatedResult implements
SingleTableInventoryCalculatedResult {
+ private static final class CalculatedResult implements
TableInventoryCheckCalculatedResult {
private final int recordsCount;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculator.java
similarity index 73%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculator.java
index c986681c9bf..4aa00eaa476 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculator.java
@@ -18,11 +18,11 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsistencyCheckUtils;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordTableInventoryCheckCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordSingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordTableInventoryCalculator;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -32,15 +32,15 @@ import java.util.List;
import java.util.Map;
/**
- * Record single table inventory check calculator.
+ * Record table inventory check calculator.
*/
-public final class RecordSingleTableInventoryCheckCalculator extends
AbstractRecordSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult,
Map<String, Object>> {
+public final class RecordTableInventoryCheckCalculator extends
AbstractRecordTableInventoryCalculator<TableInventoryCheckCalculatedResult,
Map<String, Object>> {
- public RecordSingleTableInventoryCheckCalculator(final int chunkSize,
final int streamingChunkCount, final StreamingRangeType streamingRangeType) {
+ public RecordTableInventoryCheckCalculator(final int chunkSize, final int
streamingChunkCount, final StreamingRangeType streamingRangeType) {
super(chunkSize, streamingChunkCount, streamingRangeType);
}
- public RecordSingleTableInventoryCheckCalculator(final int chunkSize,
final StreamingRangeType streamingRangeType) {
+ public RecordTableInventoryCheckCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
super(chunkSize, streamingRangeType);
}
@@ -59,7 +59,7 @@ public final class RecordSingleTableInventoryCheckCalculator
extends AbstractRec
}
@Override
- protected SingleTableInventoryCalculatedResult
convertRecordsToResult(final List<Map<String, Object>> records, final Object
maxUniqueKeyValue) {
- return new
RecordSingleTableInventoryCalculatedResult(maxUniqueKeyValue, records);
+ protected TableInventoryCheckCalculatedResult convertRecordsToResult(final
List<Map<String, Object>> records, final Object maxUniqueKeyValue) {
+ return new
RecordTableInventoryCheckCalculatedResult(maxUniqueKeyValue, records);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index bd9b034d8a5..260092795c0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -30,8 +30,8 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.posi
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordSingleTableInventoryCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.AbstractRecordTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
@@ -122,12 +122,12 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
IngestPosition initialPosition =
dumperContext.getCommonContext().getPosition();
log.info("Dump by calculator start, dataSource={}, table={},
initialPosition={}", dumperContext.getCommonContext().getDataSourceName(),
table, initialPosition);
List<String> columnNames = dumperContext.getQueryColumnNames();
- SingleTableInventoryCalculateParameter calculateParam = new
SingleTableInventoryCalculateParameter(dataSource, table,
+ TableInventoryCalculateParameter calculateParam = new
TableInventoryCalculateParameter(dataSource, table,
columnNames, dumperContext.getUniqueKeyColumns(),
QueryType.RANGE_QUERY, null);
QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>)
initialPosition).getBeginValue(), dumperContext.isFirstDump(),
((PrimaryKeyIngestPosition<?>) initialPosition).getEndValue());
calculateParam.setQueryRange(queryRange);
- RecordSingleTableInventoryDumpCalculator dumpCalculator = new
RecordSingleTableInventoryDumpCalculator(dumperContext.getBatchSize(),
StreamingRangeType.SMALL);
+ RecordTableInventoryDumpCalculator dumpCalculator = new
RecordTableInventoryDumpCalculator(dumperContext.getBatchSize(),
StreamingRangeType.SMALL);
long rowCount = 0L;
try {
String firstUniqueKey =
calculateParam.getFirstUniqueKey().getName();
@@ -223,9 +223,9 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
}
- private class RecordSingleTableInventoryDumpCalculator extends
AbstractRecordSingleTableInventoryCalculator<List<DataRecord>, DataRecord> {
+ private class RecordTableInventoryDumpCalculator extends
AbstractRecordTableInventoryCalculator<List<DataRecord>, DataRecord> {
- RecordSingleTableInventoryDumpCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
+ RecordTableInventoryDumpCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
super(chunkSize, streamingRangeType);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
similarity index 88%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordSingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
index 86965015141..71e9e184373 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractRecordTableInventoryCalculator.java
@@ -47,14 +47,14 @@ import java.util.List;
import java.util.Optional;
/**
- * Abstract record single table inventory calculator.
+ * Abstract record table inventory calculator.
*
* @param <S> the type of result
* @param <C> the type of record
*/
@HighFrequencyInvocation
@RequiredArgsConstructor
-public abstract class AbstractRecordSingleTableInventoryCalculator<S, C>
extends AbstractStreamingSingleTableInventoryCalculator<S> {
+public abstract class AbstractRecordTableInventoryCalculator<S, C> extends
AbstractStreamingTableInventoryCalculator<S> {
private static final int DEFAULT_STREAMING_CHUNK_COUNT = 100;
@@ -64,12 +64,12 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
private final StreamingRangeType streamingRangeType;
- public AbstractRecordSingleTableInventoryCalculator(final int chunkSize,
final StreamingRangeType streamingRangeType) {
+ public AbstractRecordTableInventoryCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
this(chunkSize, DEFAULT_STREAMING_CHUNK_COUNT, streamingRangeType);
}
@Override
- public Optional<S> calculateChunk(final
SingleTableInventoryCalculateParameter param) {
+ public Optional<S> calculateChunk(final TableInventoryCalculateParameter
param) {
List<C> records = calculateChunk0(param);
if (records.isEmpty()) {
return Optional.empty();
@@ -81,7 +81,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return Optional.of(convertRecordsToResult(records, maxUniqueKeyValue));
}
- private List<C> calculateChunk0(final
SingleTableInventoryCalculateParameter param) {
+ private List<C> calculateChunk0(final TableInventoryCalculateParameter
param) {
InventoryColumnValueReaderEngine columnValueReaderEngine = new
InventoryColumnValueReaderEngine(param.getDatabaseType());
try {
if (QueryType.POINT_QUERY == param.getQueryType()) {
@@ -103,7 +103,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
}
- private List<C> pointQuery(final SingleTableInventoryCalculateParameter
param, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws
SQLException {
+ private List<C> pointQuery(final TableInventoryCalculateParameter param,
final InventoryColumnValueReaderEngine columnValueReaderEngine) throws
SQLException {
List<C> result = new LinkedList<>();
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
@@ -117,7 +117,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return result;
}
- private List<C> allQuery(final SingleTableInventoryCalculateParameter
param, final InventoryColumnValueReaderEngine columnValueReaderEngine) throws
SQLException {
+ private List<C> allQuery(final TableInventoryCalculateParameter param,
final InventoryColumnValueReaderEngine columnValueReaderEngine) throws
SQLException {
List<C> result = new LinkedList<>();
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
@@ -136,7 +136,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return result;
}
- private List<C> rangeQueryWithSingleColumUniqueKey(final
SingleTableInventoryCalculateParameter param,
+ private List<C> rangeQueryWithSingleColumUniqueKey(final
TableInventoryCalculateParameter param,
final
InventoryColumnValueReaderEngine columnValueReaderEngine, final int round)
throws SQLException {
List<C> result = new LinkedList<>();
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
@@ -157,7 +157,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return result;
}
- private List<C> rangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param,
+ private List<C> rangeQueryWithMultiColumUniqueKeys(final
TableInventoryCalculateParameter param,
final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
if (calculationContext.getRecordDeque().size() > chunkSize) {
@@ -167,7 +167,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return queryFromBuffer(calculationContext.getRecordDeque());
}
- private void doRangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param, final CalculationContext<C>
calculationContext,
+ private void doRangeQueryWithMultiColumUniqueKeys(final
TableInventoryCalculateParameter param, final CalculationContext<C>
calculationContext,
final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
prepareDatabaseResources(calculationContext, param);
ResultSet resultSet =
calculationContext.getDatabaseResources().getResultSet();
@@ -205,10 +205,10 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
}
- private List<C> pointRangeQuery(final
SingleTableInventoryCalculateParameter param, final C duplicateRecord,
+ private List<C> pointRangeQuery(final TableInventoryCalculateParameter
param, final C duplicateRecord,
final InventoryColumnValueReaderEngine
columnValueReaderEngine) throws SQLException {
Object duplicateUniqueKeyValue =
getFirstUniqueKeyValue(duplicateRecord, param.getFirstUniqueKey().getName());
- SingleTableInventoryCalculateParameter newParam =
buildPointRangeQueryCalculateParameter(param, duplicateUniqueKeyValue);
+ TableInventoryCalculateParameter newParam =
buildPointRangeQueryCalculateParameter(param, duplicateUniqueKeyValue);
try {
return pointQuery(newParam, columnValueReaderEngine);
} finally {
@@ -232,7 +232,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
@SuppressWarnings("unchecked")
- private CalculationContext<C> prepareCalculationContext(final
SingleTableInventoryCalculateParameter param) {
+ private CalculationContext<C> prepareCalculationContext(final
TableInventoryCalculateParameter param) {
CalculationContext<C> result = (CalculationContext<C>)
param.getCalculationContext();
if (null != result) {
return result;
@@ -242,7 +242,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return result;
}
- private void prepareDatabaseResources(final CalculationContext<C>
calculationContext, final SingleTableInventoryCalculateParameter param) throws
SQLException {
+ private void prepareDatabaseResources(final CalculationContext<C>
calculationContext, final TableInventoryCalculateParameter param) throws
SQLException {
if (calculationContext.getDatabaseResources().isReady()) {
return;
}
@@ -259,7 +259,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
databaseResources.setReady(true);
}
- private String getQuerySQL(final SingleTableInventoryCalculateParameter
param) {
+ private String getQuerySQL(final TableInventoryCalculateParameter param) {
ShardingSpherePreconditions.checkState(null != param.getUniqueKeys()
&& !param.getUniqueKeys().isEmpty() && null != param.getFirstUniqueKey(),
() -> new UnsupportedOperationException("Record inventory
calculator does not support table without unique key and primary key now."));
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
@@ -275,7 +275,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
}
- private void setParameters(final PreparedStatement preparedStatement,
final SingleTableInventoryCalculateParameter param) throws SQLException {
+ private void setParameters(final PreparedStatement preparedStatement,
final TableInventoryCalculateParameter param) throws SQLException {
QueryType queryType = param.getQueryType();
if (queryType == QueryType.RANGE_QUERY) {
QueryRange queryRange = param.getQueryRange();
@@ -312,8 +312,8 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
}
- private SingleTableInventoryCalculateParameter
buildPointRangeQueryCalculateParameter(final
SingleTableInventoryCalculateParameter param, final Object uniqueKeyValue) {
- SingleTableInventoryCalculateParameter result = new
SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(),
param.getColumnNames(),
+ private TableInventoryCalculateParameter
buildPointRangeQueryCalculateParameter(final TableInventoryCalculateParameter
param, final Object uniqueKeyValue) {
+ TableInventoryCalculateParameter result = new
TableInventoryCalculateParameter(param.getDataSource(), param.getTable(),
param.getColumnNames(),
Collections.singletonList(param.getFirstUniqueKey()),
QueryType.POINT_QUERY, param.getQueryCondition());
result.setUniqueKeysValues(Collections.singletonList(uniqueKeyValue));
result.setShardingColumnsNames(param.getShardingColumnsNames());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
similarity index 84%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingSingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
index c5ef9c595f4..f20f74ea11d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractStreamingTableInventoryCalculator.java
@@ -27,15 +27,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Abstract streaming single table inventory calculator.
+ * Abstract streaming table inventory calculator.
*
* @param <S> the type of result
*/
@Getter
-public abstract class AbstractStreamingSingleTableInventoryCalculator<S>
extends AbstractSingleTableInventoryCalculator<S> {
+public abstract class AbstractStreamingTableInventoryCalculator<S> extends
AbstractTableInventoryCalculator<S> {
@Override
- public final Iterable<S> calculate(final
SingleTableInventoryCalculateParameter param) {
+ public final Iterable<S> calculate(final TableInventoryCalculateParameter
param) {
return new ResultIterable(param);
}
@@ -45,7 +45,7 @@ public abstract class
AbstractStreamingSingleTableInventoryCalculator<S> extends
* @param param data consistency calculate parameter
* @return optional calculated result, empty means there's no more result
*/
- protected abstract Optional<S>
calculateChunk(SingleTableInventoryCalculateParameter param);
+ protected abstract Optional<S>
calculateChunk(TableInventoryCalculateParameter param);
/**
* It's not thread-safe, it should be executed in only one thread at the
same time.
@@ -53,7 +53,7 @@ public abstract class
AbstractStreamingSingleTableInventoryCalculator<S> extends
@RequiredArgsConstructor
private final class ResultIterable implements Iterable<S> {
- private final SingleTableInventoryCalculateParameter param;
+ private final TableInventoryCalculateParameter param;
@Override
public Iterator<S> iterator() {
@@ -68,7 +68,7 @@ public abstract class
AbstractStreamingSingleTableInventoryCalculator<S> extends
private final AtomicReference<Optional<S>> nextResult = new
AtomicReference<>();
- private final SingleTableInventoryCalculateParameter param;
+ private final TableInventoryCalculateParameter param;
@Override
public boolean hasNext() {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractTableInventoryCalculator.java
similarity index 93%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractSingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractTableInventoryCalculator.java
index bc18d44a8ab..2e66861ccd0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/AbstractTableInventoryCalculator.java
@@ -27,12 +27,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Abstract single table inventory calculator.
+ * Abstract table inventory calculator.
*
* @param <S> the type of result
*/
@Slf4j
-public abstract class AbstractSingleTableInventoryCalculator<S> implements
SingleTableInventoryCalculator<S> {
+public abstract class AbstractTableInventoryCalculator<S> implements
TableInventoryCalculator<S> {
private final AtomicBoolean canceling = new AtomicBoolean(false);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculateParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/TableInventoryCalculateParameter.java
similarity index 98%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculateParameter.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/TableInventoryCalculateParameter.java
index 71d6a8d3372..08c1335e922 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculateParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/TableInventoryCalculateParameter.java
@@ -34,11 +34,11 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
- * Single table inventory calculate parameter.
+ * Table inventory calculate parameter.
*/
@RequiredArgsConstructor
@Getter
-public final class SingleTableInventoryCalculateParameter {
+public final class TableInventoryCalculateParameter {
/**
* Data source of source side or target side.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/TableInventoryCalculator.java
similarity index 86%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/TableInventoryCalculator.java
index 683878f0752..3e2b7f9bd59 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/TableInventoryCalculator.java
@@ -20,11 +20,11 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.que
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
/**
- * Single table inventory calculator.
+ * Table inventory calculator.
*
* @param <S> the type of result
*/
-public interface SingleTableInventoryCalculator<S> extends PipelineCancellable
{
+public interface TableInventoryCalculator<S> extends PipelineCancellable {
/**
* Calculate for single table inventory data.
@@ -32,5 +32,5 @@ public interface SingleTableInventoryCalculator<S> extends
PipelineCancellable {
* @param param calculate parameter
* @return calculated result
*/
- Iterable<S> calculate(SingleTableInventoryCalculateParameter param);
+ Iterable<S> calculate(TableInventoryCalculateParameter param);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
similarity index 58%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
index e788b9549a9..70c48c2d561 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordSingleTableInventoryCalculatedResultTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/result/RecordTableInventoryCheckCalculatedResultTest.java
@@ -31,43 +31,43 @@ import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
-class RecordSingleTableInventoryCalculatedResultTest {
+class RecordTableInventoryCheckCalculatedResultTest {
@Test
void assertNotEqualsWithNull() {
- assertFalse(new RecordSingleTableInventoryCalculatedResult(0,
Collections.emptyList()).equals(null));
+ assertFalse(new RecordTableInventoryCheckCalculatedResult(0,
Collections.emptyList()).equals(null));
}
@Test
void assertEqualsWithSameObject() {
- RecordSingleTableInventoryCalculatedResult calculatedResult = new
RecordSingleTableInventoryCalculatedResult(0, Collections.emptyList());
+ RecordTableInventoryCheckCalculatedResult calculatedResult = new
RecordTableInventoryCheckCalculatedResult(0, Collections.emptyList());
assertThat(calculatedResult, is(calculatedResult));
}
@Test
void assertNotEqualsWithDifferentClassType() {
- RecordSingleTableInventoryCalculatedResult actual = new
RecordSingleTableInventoryCalculatedResult(0, Collections.emptyList());
+ RecordTableInventoryCheckCalculatedResult actual = new
RecordTableInventoryCheckCalculatedResult(0, Collections.emptyList());
Object expected = new Object();
assertThat(actual, not(expected));
}
@Test
void assertEqualsWithEmptyRecords() {
- RecordSingleTableInventoryCalculatedResult actual = new
RecordSingleTableInventoryCalculatedResult(0, Collections.emptyList());
- RecordSingleTableInventoryCalculatedResult expected = new
RecordSingleTableInventoryCalculatedResult(0, Collections.emptyList());
+ RecordTableInventoryCheckCalculatedResult actual = new
RecordTableInventoryCheckCalculatedResult(0, Collections.emptyList());
+ RecordTableInventoryCheckCalculatedResult expected = new
RecordTableInventoryCheckCalculatedResult(0, Collections.emptyList());
assertThat(actual, is(expected));
}
@Test
void assertEqualsWithFullTypeRecords() {
- RecordSingleTableInventoryCalculatedResult actual = new
RecordSingleTableInventoryCalculatedResult(1000,
Arrays.asList(buildFixedFullTypeRecord(), buildFixedFullTypeRecord()));
- RecordSingleTableInventoryCalculatedResult expected = new
RecordSingleTableInventoryCalculatedResult(1000,
Arrays.asList(buildFixedFullTypeRecord(), buildFixedFullTypeRecord()));
+ RecordTableInventoryCheckCalculatedResult actual = new
RecordTableInventoryCheckCalculatedResult(1000,
Arrays.asList(buildFixedFullTypeRecord(), buildFixedFullTypeRecord()));
+ RecordTableInventoryCheckCalculatedResult expected = new
RecordTableInventoryCheckCalculatedResult(1000,
Arrays.asList(buildFixedFullTypeRecord(), buildFixedFullTypeRecord()));
assertThat(actual, is(expected));
}
@Test
void assertFullTypeRecordsEqualsWithDifferentDecimalScale() {
- RecordSingleTableInventoryCalculatedResult expected = new
RecordSingleTableInventoryCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord()));
+ RecordTableInventoryCheckCalculatedResult expected = new
RecordTableInventoryCheckCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord()));
Map<String, Object> recordMap = buildFixedFullTypeRecord();
recordMap.forEach((key, value) -> {
if (value instanceof BigDecimal) {
@@ -75,28 +75,28 @@ class RecordSingleTableInventoryCalculatedResultTest {
recordMap.put(key, decimal.setScale(decimal.scale() + 1,
RoundingMode.CEILING));
}
});
- RecordSingleTableInventoryCalculatedResult actual = new
RecordSingleTableInventoryCalculatedResult(1000,
Collections.singletonList(recordMap));
+ RecordTableInventoryCheckCalculatedResult actual = new
RecordTableInventoryCheckCalculatedResult(1000,
Collections.singletonList(recordMap));
assertThat(actual, is(expected));
}
@Test
void assertNotEqualsWithDifferentRecordsCount() {
- assertThat(new RecordSingleTableInventoryCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord())),
- not(new RecordSingleTableInventoryCalculatedResult(1000,
Collections.emptyList())));
+ assertThat(new RecordTableInventoryCheckCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord())),
+ not(new RecordTableInventoryCheckCalculatedResult(1000,
Collections.emptyList())));
}
@Test
void assertNotEqualsWithDifferentMaxUniqueKeyValue() {
- assertThat(new RecordSingleTableInventoryCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord())),
- not(new RecordSingleTableInventoryCalculatedResult(1001,
Collections.singletonList(buildFixedFullTypeRecord()))));
+ assertThat(new RecordTableInventoryCheckCalculatedResult(1000,
Collections.singletonList(buildFixedFullTypeRecord())),
+ not(new RecordTableInventoryCheckCalculatedResult(1001,
Collections.singletonList(buildFixedFullTypeRecord()))));
}
@Test
void assertNotEqualsWithDifferentRandomColumnValue() {
Map<String, Object> record = buildFixedFullTypeRecord();
- RecordSingleTableInventoryCalculatedResult result1 = new
RecordSingleTableInventoryCalculatedResult(1000,
Collections.singletonList(record));
+ RecordTableInventoryCheckCalculatedResult result1 = new
RecordTableInventoryCheckCalculatedResult(1000,
Collections.singletonList(record));
record.forEach((key, value) -> {
- RecordSingleTableInventoryCalculatedResult result2 = new
RecordSingleTableInventoryCalculatedResult(1000,
+ RecordTableInventoryCheckCalculatedResult result2 = new
RecordTableInventoryCheckCalculatedResult(1000,
Collections.singletonList(modifyColumnValueRandomly(buildFixedFullTypeRecord(),
key)));
assertThat(result1, not(result2));
});
@@ -112,7 +112,7 @@ class RecordSingleTableInventoryCalculatedResultTest {
@Test
void assertHashcode() {
- assertThat(new RecordSingleTableInventoryCalculatedResult(1000,
Collections.emptyList()).hashCode(),
- is(new RecordSingleTableInventoryCalculatedResult(1000,
Collections.emptyList()).hashCode()));
+ assertThat(new RecordTableInventoryCheckCalculatedResult(1000,
Collections.emptyList()).hashCode(),
+ is(new RecordTableInventoryCheckCalculatedResult(1000,
Collections.emptyList()).hashCode()));
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
similarity index 88%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculatorTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
index fb0317c539e..8a8b1a783fa 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32TableInventoryCheckCalculatorTest.java
@@ -17,11 +17,11 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -53,9 +53,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class CRC32SingleTableInventoryCheckCalculatorTest {
+class CRC32TableInventoryCheckCalculatorTest {
- private SingleTableInventoryCalculateParameter parameter;
+ private TableInventoryCalculateParameter parameter;
@Mock
private PipelineDataSource pipelineDataSource;
@@ -67,7 +67,7 @@ class CRC32SingleTableInventoryCheckCalculatorTest {
void setUp() throws SQLException {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER,
"integer", false, true, true));
- parameter = new
SingleTableInventoryCalculateParameter(pipelineDataSource, new
QualifiedTable(null, "foo_tbl"),
+ parameter = new TableInventoryCalculateParameter(pipelineDataSource,
new QualifiedTable(null, "foo_tbl"),
Arrays.asList("foo_col", "bar_col"), uniqueKeys,
QueryType.RANGE_QUERY, null);
when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType);
when(pipelineDataSource.getConnection()).thenReturn(connection);
@@ -79,7 +79,7 @@ class CRC32SingleTableInventoryCheckCalculatorTest {
when(connection.prepareStatement("SELECT CRC32(foo_col) FROM
foo_tbl")).thenReturn(preparedStatement0);
PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
when(connection.prepareStatement("SELECT CRC32(bar_col) FROM
foo_tbl")).thenReturn(preparedStatement1);
- Iterator<SingleTableInventoryCalculatedResult> actual = new
CRC32SingleTableInventoryCheckCalculator().calculate(parameter).iterator();
+ Iterator<TableInventoryCheckCalculatedResult> actual = new
CRC32TableInventoryCheckCalculator().calculate(parameter).iterator();
assertThat(actual.next().getRecordsCount(), is(10));
assertFalse(actual.hasNext());
}
@@ -96,6 +96,6 @@ class CRC32SingleTableInventoryCheckCalculatorTest {
@Test
void assertCalculateFailed() throws SQLException {
when(connection.prepareStatement(anyString())).thenThrow(new
SQLException(""));
-
assertThrows(PipelineTableDataConsistencyCheckLoadingFailedException.class, ()
-> new CRC32SingleTableInventoryCheckCalculator().calculate(parameter));
+
assertThrows(PipelineTableDataConsistencyCheckLoadingFailedException.class, ()
-> new CRC32TableInventoryCheckCalculator().calculate(parameter));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
similarity index 72%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
index db34c20f250..d39bf3d4b8c 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCheckCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordTableInventoryCheckCalculatorTest.java
@@ -19,13 +19,13 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang3.RandomStringUtils;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordTableInventoryCheckCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableInventoryCheckCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.StreamingRangeType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.TableInventoryCalculateParameter;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
@@ -52,7 +52,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-class RecordSingleTableInventoryCheckCalculatorTest {
+class RecordTableInventoryCheckCalculatorTest {
private static PipelineDataSource dataSource;
@@ -107,8 +107,8 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String
streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(0,
false, null), 4, 4);
}
@@ -116,8 +116,8 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final
String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(4,
false, null), 4, 8);
}
@@ -125,20 +125,20 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys(final String
streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
true, 6), 8, 6);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
false, 6), 3, 6);
}
- private void assertQueryRangeCalculatedResult(final
RecordSingleTableInventoryCheckCalculator calculator, final
SingleTableInventoryCalculateParameter param, final QueryRange queryRange,
+ private void assertQueryRangeCalculatedResult(final
RecordTableInventoryCheckCalculator calculator, final
TableInventoryCalculateParameter param, final QueryRange queryRange,
final int
expectedRecordsCount, final int expectedMaxUniqueKeyValue) {
param.setQueryRange(queryRange);
- Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
+ Optional<TableInventoryCheckCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertTrue(calculatedResult.isPresent());
- SingleTableInventoryCalculatedResult actual = calculatedResult.get();
+ TableInventoryCheckCalculatedResult actual = calculatedResult.get();
assertThat(actual.getRecordsCount(), is(expectedRecordsCount));
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(),
is(expectedMaxUniqueKeyValue));
@@ -153,8 +153,8 @@ class RecordSingleTableInventoryCheckCalculatorTest {
connection.createStatement().execute(
"INSERT INTO test3 (user_id,order_id,status) VALUES
(3,1,'ok'),(3,2,'ok'),(4,3,'ok'),(4,4,'ok'),(5,5,'ok'),(5,6,'ok'),(6,7,'ok'),(6,8,'ok'),(7,9,'ok')");
}
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test3"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "test3"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
true, 4), 4, 4);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5,
true, 6), 4, 6);
@@ -164,11 +164,11 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfReservedRangeQueryWithMultiColumnUniqueKeys(final
String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(3, true, 2));
- Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
+ Optional<TableInventoryCheckCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertFalse(calculatedResult.isPresent());
}
@@ -180,11 +180,11 @@ class RecordSingleTableInventoryCheckCalculatorTest {
connection.createStatement().execute("DROP TABLE IF EXISTS test1");
connection.createStatement().execute("CREATE TABLE test1 (user_id
INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id))");
}
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test1"),
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "test1"),
Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(0, false, null));
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
- Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
+ Optional<TableInventoryCheckCalculatedResult> calculateResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertFalse(calculateResult.isPresent());
}
@@ -196,11 +196,11 @@ class RecordSingleTableInventoryCheckCalculatorTest {
connection.createStatement().execute("DROP TABLE IF EXISTS test2");
connection.createStatement().execute("CREATE TABLE test2 (user_id
INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id,
order_id))");
}
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test2"),
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null, "test2"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
- Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
+ Optional<TableInventoryCheckCalculatedResult> calculateResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertFalse(calculateResult.isPresent());
}
@@ -208,33 +208,33 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"100,SMALL", "2,SMALL", "3,SMALL", "100,LARGE", "2,LARGE",
"3,LARGE"})
void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int
streamingChunkCount, final String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
- Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
- RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
+ Iterator<TableInventoryCheckCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
+ RecordTableInventoryCheckCalculatedResult actual =
(RecordTableInventoryCheckCalculatedResult) resultIterator.next();
assertThat(actual.getRecordsCount(), is(3));
assertRecord(actual.getRecords().get(0), 1, 1);
assertRecord(actual.getRecords().get(1), 2, 2);
assertRecord(actual.getRecords().get(2), 3, 3);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(3));
assertRecord(actual.getRecords().get(0), 3, 4);
assertRecord(actual.getRecords().get(1), 3, 5);
assertRecord(actual.getRecords().get(2), 3, 6);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(6));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(3));
assertRecord(actual.getRecords().get(0), 3, 7);
assertRecord(actual.getRecords().get(1), 4, 8);
assertRecord(actual.getRecords().get(2), 5, 9);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(9));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(1));
assertRecord(actual.getRecords().get(0), 6, 10);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
@@ -246,12 +246,12 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void
assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final String
streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(50, 100,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(50, 100,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
- Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
- RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
+ Iterator<TableInventoryCheckCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
+ RecordTableInventoryCheckCalculatedResult actual =
(RecordTableInventoryCheckCalculatedResult) resultIterator.next();
assertThat(actual.getRecordsCount(), is(10));
assertRecord(actual.getRecords().get(0), 1, 1);
assertRecord(actual.getRecords().get(1), 2, 2);
@@ -272,33 +272,33 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"100,SMALL", "2,SMALL", "3,SMALL", "100,LARGE", "2,LARGE",
"3,LARGE"})
void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final
int streamingChunkCount, final String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
- Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
- RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
+ Iterator<TableInventoryCheckCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
+ RecordTableInventoryCheckCalculatedResult actual =
(RecordTableInventoryCheckCalculatedResult) resultIterator.next();
assertThat(actual.getRecordsCount(), is(3));
assertRecord(actual.getRecords().get(0), 1, 1);
assertRecord(actual.getRecords().get(1), 2, 2);
assertRecord(actual.getRecords().get(2), 3, 3);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(3));
assertRecord(actual.getRecords().get(0), 3, 4);
assertRecord(actual.getRecords().get(1), 3, 5);
assertRecord(actual.getRecords().get(2), 3, 6);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(3));
assertRecord(actual.getRecords().get(0), 3, 7);
assertRecord(actual.getRecords().get(1), 4, 8);
assertRecord(actual.getRecords().get(2), 5, 9);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(5));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(1));
assertRecord(actual.getRecords().get(0), 6, 10);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
@@ -310,36 +310,36 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"100,SMALL", "3,SMALL", "4,SMALL", "100,LARGE", "3,LARGE",
"4,LARGE"})
void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith2x(final
int streamingChunkCount, final String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(2, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(2, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
- Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
- RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
+ Iterator<TableInventoryCheckCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
+ RecordTableInventoryCheckCalculatedResult actual =
(RecordTableInventoryCheckCalculatedResult) resultIterator.next();
assertThat(actual.getRecordsCount(), is(2));
assertRecord(actual.getRecords().get(0), 1, 1);
assertRecord(actual.getRecords().get(1), 2, 2);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(2));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(2));
assertRecord(actual.getRecords().get(0), 3, 3);
assertRecord(actual.getRecords().get(1), 3, 4);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(2));
assertRecord(actual.getRecords().get(0), 3, 5);
assertRecord(actual.getRecords().get(1), 3, 6);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(2));
assertRecord(actual.getRecords().get(0), 3, 7);
assertRecord(actual.getRecords().get(1), 4, 8);
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(4));
- actual = (RecordSingleTableInventoryCalculatedResult)
resultIterator.next();
+ actual = (RecordTableInventoryCheckCalculatedResult)
resultIterator.next();
assertThat(actual.getRecordsCount(), is(2));
assertRecord(actual.getRecords().get(0), 5, 9);
assertRecord(actual.getRecords().get(1), 6, 10);
@@ -357,14 +357,14 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfPointQuery(final String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.POINT_QUERY, null);
param.setUniqueKeysValues(Arrays.asList(3, 3));
- Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
+ Optional<TableInventoryCheckCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertTrue(calculatedResult.isPresent());
- SingleTableInventoryCalculatedResult actual = calculatedResult.get();
+ TableInventoryCheckCalculatedResult actual = calculatedResult.get();
assertThat(actual.getRecordsCount(), is(1));
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));
@@ -373,14 +373,14 @@ class RecordSingleTableInventoryCheckCalculatorTest {
@ParameterizedTest
@CsvSource({"SMALL", "LARGE"})
void assertCalculateOfPointRangeQuery(final String streamingRangeType) {
- RecordSingleTableInventoryCheckCalculator calculator = new
RecordSingleTableInventoryCheckCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
- SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
+ RecordTableInventoryCheckCalculator calculator = new
RecordTableInventoryCheckCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
+ TableInventoryCalculateParameter param = new
TableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.POINT_QUERY, null);
param.setUniqueKeysValues(Collections.singleton(3));
- Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
+ Optional<TableInventoryCheckCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
assertTrue(calculatedResult.isPresent());
- SingleTableInventoryCalculatedResult actual = calculatedResult.get();
+ TableInventoryCheckCalculatedResult actual = calculatedResult.get();
assertThat(actual.getRecordsCount(), is(5));
assertTrue(actual.getMaxUniqueKeyValue().isPresent());
assertThat(actual.getMaxUniqueKeyValue().get(), is(3));