This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 52f74ea13b5 Add generic type RESULT for SingleTableInventoryCalculator
(#36739)
52f74ea13b5 is described below
commit 52f74ea13b51c3db95bb1d1a4fe621dbd3fd4a91
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Sep 28 19:03:17 2025 +0800
Add generic type RESULT for SingleTableInventoryCalculator (#36739)
---
.../CRC32MatchTableDataConsistencyChecker.java | 9 +++++----
.../table/DataMatchTableDataConsistencyChecker.java | 5 +++--
.../table/MatchingTableInventoryChecker.java | 12 ++++++------
.../AbstractSingleTableInventoryCalculator.java | 5 ++++-
...ractStreamingSingleTableInventoryCalculator.java | 21 +++++++++++----------
...> CRC32SingleTableInventoryCheckCalculator.java} | 4 ++--
.../RecordSingleTableInventoryCalculator.java | 2 +-
.../calculator/SingleTableInventoryCalculator.java | 10 ++++++----
...C32SingleTableInventoryCheckCalculatorTest.java} | 6 +++---
9 files changed, 41 insertions(+), 33 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index 0e7cec5fcf0..8303a34a2b1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -17,8 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.CRC32SingleTableInventoryCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.CRC32SingleTableInventoryCheckCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -63,8 +64,8 @@ public final class CRC32MatchTableDataConsistencyChecker
implements TableDataCon
}
@Override
- protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
- return new CRC32SingleTableInventoryCalculator();
+ protected
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator() {
+ return new CRC32SingleTableInventoryCheckCalculator();
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 7215753d455..48016bcedb2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -18,10 +18,11 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
import com.google.common.base.Strings;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckIgnoredType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.StreamingRangeType;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
@@ -127,7 +128,7 @@ public final class DataMatchTableDataConsistencyChecker
implements TableDataCons
}
@Override
- protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
+ protected
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator() {
return new RecordSingleTableInventoryCalculator(chunkSize,
streamingRangeType);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index bcd622ee34a..5a10ce2f44b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResultSwapper;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
@@ -57,9 +57,9 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private final AtomicBoolean canceling = new AtomicBoolean(false);
- private volatile SingleTableInventoryCalculator sourceCalculator;
+ private volatile
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
sourceCalculator;
- private volatile SingleTableInventoryCalculator targetCalculator;
+ private volatile
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
targetCalculator;
@Override
public TableDataConsistencyCheckResult checkSingleTableInventoryData() {
@@ -83,9 +83,9 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
targetParam.setQueryRange(new QueryRange(null !=
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition()
: checkRangePosition.getTargetRange().getBeginValue(),
true, checkRangePosition.getTargetRange().getEndValue()));
- SingleTableInventoryCalculator sourceCalculator =
buildSingleTableInventoryCalculator();
+ SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
sourceCalculator = buildSingleTableInventoryCalculator();
this.sourceCalculator = sourceCalculator;
- SingleTableInventoryCalculator targetCalculator =
buildSingleTableInventoryCalculator();
+ SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
targetCalculator = buildSingleTableInventoryCalculator();
this.targetCalculator = targetCalculator;
try {
Iterator<SingleTableInventoryCalculatedResult>
sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
@@ -132,7 +132,7 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
- protected abstract SingleTableInventoryCalculator
buildSingleTableInventoryCalculator();
+ protected abstract
SingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
buildSingleTableInventoryCalculator();
@Override
public void cancel() {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractSingleTableInventoryCalculator.java
index 631c1d3a11d..349c3ca4065 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractSingleTableInventoryCalculator.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator.SingleTableInventoryCalculator;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
@@ -28,9 +29,11 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Abstract single table inventory calculator.
+ *
+ * @param <S> the type of result
*/
@Slf4j
-public abstract class AbstractSingleTableInventoryCalculator implements
SingleTableInventoryCalculator {
+public abstract class AbstractSingleTableInventoryCalculator<S> implements
SingleTableInventoryCalculator<S> {
private final AtomicBoolean canceling = new AtomicBoolean(false);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractStreamingSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractStreamingSingleTableInventoryCalculator.java
index 2317c98f20e..97b38b28c71 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractStreamingSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractStreamingSingleTableInventoryCalculator.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -29,12 +28,14 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* Abstract streaming single table inventory calculator.
+ *
+ * @param <S> the type of result
*/
@Getter
-public abstract class AbstractStreamingSingleTableInventoryCalculator extends
AbstractSingleTableInventoryCalculator {
+public abstract class AbstractStreamingSingleTableInventoryCalculator<S>
extends AbstractSingleTableInventoryCalculator<S> {
@Override
- public final Iterable<SingleTableInventoryCalculatedResult>
calculate(final SingleTableInventoryCalculateParameter param) {
+ public final Iterable<S> calculate(final
SingleTableInventoryCalculateParameter param) {
return new ResultIterable(param);
}
@@ -44,28 +45,28 @@ public abstract class
AbstractStreamingSingleTableInventoryCalculator extends Ab
* @param param data consistency calculate parameter
* @return optional calculated result, empty means there's no more result
*/
- protected abstract Optional<SingleTableInventoryCalculatedResult>
calculateChunk(SingleTableInventoryCalculateParameter param);
+ protected abstract Optional<S>
calculateChunk(SingleTableInventoryCalculateParameter param);
/**
* It's not thread-safe, it should be executed in only one thread at the
same time.
*/
@RequiredArgsConstructor
- private final class ResultIterable implements
Iterable<SingleTableInventoryCalculatedResult> {
+ private final class ResultIterable implements Iterable<S> {
private final SingleTableInventoryCalculateParameter param;
@Override
- public Iterator<SingleTableInventoryCalculatedResult> iterator() {
+ public Iterator<S> iterator() {
return new ResultIterator(param);
}
}
@RequiredArgsConstructor
- private final class ResultIterator implements
Iterator<SingleTableInventoryCalculatedResult> {
+ private final class ResultIterator implements Iterator<S> {
private final AtomicBoolean currentChunkCalculated = new
AtomicBoolean();
- private final
AtomicReference<Optional<SingleTableInventoryCalculatedResult>> nextResult =
new AtomicReference<>();
+ private final AtomicReference<Optional<S>> nextResult = new
AtomicReference<>();
private final SingleTableInventoryCalculateParameter param;
@@ -76,9 +77,9 @@ public abstract class
AbstractStreamingSingleTableInventoryCalculator extends Ab
}
@Override
- public SingleTableInventoryCalculatedResult next() {
+ public S next() {
calculateIfNecessary();
- Optional<SingleTableInventoryCalculatedResult> result =
nextResult.get();
+ Optional<S> result = nextResult.get();
nextResult.set(null);
currentChunkCalculated.set(false);
return result.orElseThrow(NoSuchElementException::new);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculator.java
similarity index 96%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculator.java
index 2763123283b..e052c506951 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculator.java
@@ -36,10 +36,10 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * CRC32 single table inventory calculator.
+ * CRC32 single table inventory check calculator.
*/
@Slf4j
-public final class CRC32SingleTableInventoryCalculator extends
AbstractSingleTableInventoryCalculator {
+public final class CRC32SingleTableInventoryCheckCalculator extends
AbstractSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult> {
@Override
public Iterable<SingleTableInventoryCalculatedResult> calculate(final
SingleTableInventoryCalculateParameter param) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 5e0d96f2d02..06f19c256fd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -53,7 +53,7 @@ import java.util.Optional;
*/
@HighFrequencyInvocation
@RequiredArgsConstructor
-public final class RecordSingleTableInventoryCalculator extends
AbstractStreamingSingleTableInventoryCalculator {
+public final class RecordSingleTableInventoryCalculator extends
AbstractStreamingSingleTableInventoryCalculator<SingleTableInventoryCalculatedResult>
{
private static final int DEFAULT_STREAMING_CHUNK_COUNT = 100;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculator.java
similarity index 76%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculator.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculator.java
index b108da56e61..4815e06ec47 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/calculator/SingleTableInventoryCalculator.java
@@ -15,15 +15,17 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.calculator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineCancellable;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
/**
* Single table inventory calculator.
+ *
+ * @param <S> the type of result
*/
-public interface SingleTableInventoryCalculator extends PipelineCancellable {
+public interface SingleTableInventoryCalculator<S> extends PipelineCancellable
{
/**
* Calculate for single table inventory data.
@@ -31,5 +33,5 @@ public interface SingleTableInventoryCalculator extends
PipelineCancellable {
* @param param calculate parameter
* @return calculated result
*/
- Iterable<SingleTableInventoryCalculatedResult>
calculate(SingleTableInventoryCalculateParameter param);
+ Iterable<S> calculate(SingleTableInventoryCalculateParameter param);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculatorTest.java
similarity index 95%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculatorTest.java
index 597b1e215ad..ab5466fda58 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCheckCalculatorTest.java
@@ -52,7 +52,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class CRC32SingleTableInventoryCalculatorTest {
+class CRC32SingleTableInventoryCheckCalculatorTest {
private SingleTableInventoryCalculateParameter parameter;
@@ -78,7 +78,7 @@ class CRC32SingleTableInventoryCalculatorTest {
when(connection.prepareStatement("SELECT CRC32(foo_col) FROM
foo_tbl")).thenReturn(preparedStatement0);
PreparedStatement preparedStatement1 = mockPreparedStatement(456L, 10);
when(connection.prepareStatement("SELECT CRC32(bar_col) FROM
foo_tbl")).thenReturn(preparedStatement1);
- Iterator<SingleTableInventoryCalculatedResult> actual = new
CRC32SingleTableInventoryCalculator().calculate(parameter).iterator();
+ Iterator<SingleTableInventoryCalculatedResult> actual = new
CRC32SingleTableInventoryCheckCalculator().calculate(parameter).iterator();
assertThat(actual.next().getRecordsCount(), is(10));
assertFalse(actual.hasNext());
}
@@ -95,6 +95,6 @@ class CRC32SingleTableInventoryCalculatorTest {
@Test
void assertCalculateFailed() throws SQLException {
when(connection.prepareStatement(anyString())).thenThrow(new
SQLException(""));
-
assertThrows(PipelineTableDataConsistencyCheckLoadingFailedException.class, ()
-> new CRC32SingleTableInventoryCalculator().calculate(parameter));
+
assertThrows(PipelineTableDataConsistencyCheckLoadingFailedException.class, ()
-> new CRC32SingleTableInventoryCheckCalculator().calculate(parameter));
}
}