This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ce1b0577316 Refactor InventoryDumper to reuse 
AbstractRecordSingleTableInventoryCalculator (#36830)
ce1b0577316 is described below

commit ce1b05773164938235b0ea651232661bbcae8fa4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Oct 9 19:09:08 2025 +0800

    Refactor InventoryDumper to reuse 
AbstractRecordSingleTableInventoryCalculator (#36830)
---
 .../ingest/dumper/inventory/InventoryDumper.java   | 241 ++++++++-------------
 .../inventory/query/InventoryQueryParameter.java   |  33 ---
 .../query/point/InventoryPointQueryParameter.java  |  32 ---
 .../query/range/InventoryRangeQueryParameter.java  |  32 ---
 .../pipeline/core/ingest/record/DataRecord.java    |  16 ++
 .../splitter/InventoryDumperContextSplitter.java   |   4 +-
 .../sqlbuilder/sql/BuildDivisibleSQLParameter.java |  43 ----
 .../sql/PipelineInventoryDumpSQLBuilder.java       |  73 -------
 .../sql/PipelineInventoryDumpSQLBuilderTest.java   |  32 ---
 9 files changed, 109 insertions(+), 397 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 27b62bbd346..492924e58e8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -19,17 +19,18 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.AbstractRecordSingleTableInventoryCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.StreamingRangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.InventoryRangeQueryParameter;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -40,18 +41,17 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.BuildDivisibleSQLParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
-import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -61,9 +61,7 @@ import java.sql.Statement;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -77,7 +75,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     
     private final PipelineChannel channel;
     
-    private final DataSource dataSource;
+    private final PipelineDataSource dataSource;
     
     private final InventoryDataRecordPositionCreator positionCreator;
     
@@ -87,7 +85,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     
     private final AtomicReference<Statement> runningStatement = new 
AtomicReference<>();
     
-    public InventoryDumper(final InventoryDumperContext dumperContext, final 
PipelineChannel channel, final DataSource dataSource, final 
InventoryDataRecordPositionCreator positionCreator) {
+    public InventoryDumper(final InventoryDumperContext dumperContext, final 
PipelineChannel channel, final PipelineDataSource dataSource, final 
InventoryDataRecordPositionCreator positionCreator) {
         this.dumperContext = dumperContext;
         this.channel = channel;
         this.dataSource = dataSource;
@@ -104,11 +102,11 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             log.info("Ignored because of already finished.");
             return;
         }
-        try (Connection connection = dataSource.getConnection()) {
-            if (!dumperContext.hasUniqueKey() || 
isPrimaryKeyWithoutRange(position)) {
-                dumpWithStreamingQuery(connection);
+        try {
+            if (dumperContext.hasUniqueKey()) {
+                dumpByCalculator();
             } else {
-                dumpByPage(connection);
+                dumpWithStreamingQuery();
             }
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
@@ -118,142 +116,46 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
-        return position instanceof PrimaryKeyIngestPosition && null == 
((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null == 
((PrimaryKeyIngestPosition<?>) position).getEndValue();
-    }
-    
-    private void dumpByPage(final Connection connection) throws SQLException {
-        log.info("Start to dump inventory data by page, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
-        boolean firstQuery = true;
-        AtomicLong rowCount = new AtomicLong();
-        IngestPosition position = 
dumperContext.getCommonContext().getPosition();
-        do {
-            QueryRange queryRange = new 
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery 
&& dumperContext.isFirstDump(),
-                    ((PrimaryKeyIngestPosition<?>) position).getEndValue());
-            InventoryQueryParameter<?> queryParam = new 
InventoryRangeQueryParameter(queryRange);
-            List<Record> dataRecords = dumpByPage(connection, queryParam, 
rowCount);
-            if (dataRecords.size() > 1 && 
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), 
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
-                queryParam = new 
InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
-                dataRecords = dumpByPage(connection, queryParam, rowCount);
-            }
-            firstQuery = false;
-            if (dataRecords.isEmpty()) {
-                position = new IngestFinishedPosition();
-                dataRecords.add(new FinishedRecord(position));
-                log.info("Inventory dump done, rowCount={}, dataSource={}, 
actualTable={}", rowCount, 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
-            } else {
-                position = 
PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords, 
dataRecords.size() - 1), queryRange.getUpper());
-            }
-            channel.push(dataRecords);
-            dumperContext.getCommonContext().setPosition(position);
-        } while (!(position instanceof IngestFinishedPosition));
-        log.info("End to dump inventory data by page, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
-    }
-    
-    private List<Record> dumpByPage(final Connection connection, final 
InventoryQueryParameter<?> queryParam, final AtomicLong rowCount) throws 
SQLException {
-        DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
-        int batchSize = dumperContext.getBatchSize();
-        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpByPageSQL(queryParam), batchSize)) {
-            runningStatement.set(preparedStatement);
-            setParameters(preparedStatement, queryParam);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperContext.getRateLimitAlgorithm();
-                ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-                List<Record> result = new LinkedList<>();
-                while (resultSet.next()) {
-                    if (result.size() >= batchSize) {
-                        if (!dumperContext.hasUniqueKey()) {
-                            channel.push(result);
-                        }
-                        result = new LinkedList<>();
-                    }
-                    result.add(loadDataRecord(resultSet, resultSetMetaData));
-                    rowCount.incrementAndGet();
-                    if (!isRunning()) {
-                        log.info("Broke because of inventory dump is not 
running.");
-                        break;
-                    }
-                    if (null != rateLimitAlgorithm && 0 == rowCount.get() % 
batchSize) {
-                        
rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
-                    }
-                }
-                return result;
-            } finally {
-                runningStatement.set(null);
-            }
-        }
-    }
-    
-    private void setParameters(final PreparedStatement preparedStatement, 
final InventoryQueryParameter<?> queryParam) throws SQLException {
-        if (queryParam instanceof InventoryRangeQueryParameter) {
-            int parameterIndex = 1;
-            Object lower = ((InventoryRangeQueryParameter) 
queryParam).getValue().getLower();
-            if (null != lower) {
-                preparedStatement.setObject(parameterIndex++, lower);
-            }
-            Object upper = ((InventoryRangeQueryParameter) 
queryParam).getValue().getUpper();
-            if (null != upper) {
-                preparedStatement.setObject(parameterIndex++, upper);
-            }
-            preparedStatement.setInt(parameterIndex, 
dumperContext.getBatchSize());
-        } else if (queryParam instanceof InventoryPointQueryParameter) {
-            preparedStatement.setObject(1, queryParam.getValue());
-        } else {
-            throw new UnsupportedOperationException("Query type: " + 
queryParam.getValue());
-        }
-    }
-    
-    private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData) throws SQLException {
-        int columnCount = resultSetMetaData.getColumnCount();
-        String tableName = dumperContext.getLogicTableName();
-        DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
-        List<String> insertColumnNames = 
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
-        ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || 
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
-                () -> new PipelineInvalidParameterException("Insert column 
names count not equals ResultSet column count"));
-        for (int i = 1; i <= columnCount; i++) {
-            String columnName = insertColumnNames.isEmpty() ? 
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
-            Column column = getColumn(resultSet, resultSetMetaData, 
columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new 
ShardingSphereIdentifier(columnName)));
-            result.addColumn(column);
-        }
-        result.setActualTableName(dumperContext.getActualTableName());
-        return result;
-    }
-    
-    private Column getColumn(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final String columnName, final int 
columnIndex, final boolean isUniqueKey) throws SQLException {
-        return new NormalColumn(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true, 
isUniqueKey);
-    }
-    
-    private String buildDumpByPageSQL(final InventoryQueryParameter<?> 
queryParam) {
+    private void dumpByCalculator() throws SQLException {
+        log.info("Dump by calculator start, dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
-        PipelineColumnMetaData firstColumn = 
dumperContext.getUniqueKeyColumns().get(0);
+        QualifiedTable table = new QualifiedTable(schemaName, 
dumperContext.getActualTableName());
         List<String> columnNames = dumperContext.getQueryColumnNames();
-        if (queryParam instanceof InventoryPointQueryParameter) {
-            return sqlBuilder.buildPointQuerySQL(schemaName, 
dumperContext.getActualTableName(), columnNames, firstColumn.getName());
-        }
-        QueryRange queryRange = ((InventoryRangeQueryParameter) 
queryParam).getValue();
-        boolean lowerInclusive = queryRange.isLowerInclusive();
-        if (null != queryRange.getLower() && null != queryRange.getUpper()) {
-            return sqlBuilder.buildDivisibleSQL(new 
BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), 
columnNames, firstColumn.getName(), lowerInclusive, true));
-        }
-        if (null != queryRange.getLower()) {
-            return sqlBuilder.buildDivisibleSQL(new 
BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(), 
columnNames, firstColumn.getName(), lowerInclusive, false));
+        SingleTableInventoryCalculateParameter calculateParam = new 
SingleTableInventoryCalculateParameter(dataSource, table,
+                columnNames, dumperContext.getUniqueKeyColumns(), 
QueryType.RANGE_QUERY, null);
+        IngestPosition initialPosition = 
dumperContext.getCommonContext().getPosition();
+        QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>) 
initialPosition).getBeginValue(), dumperContext.isFirstDump(),
+                ((PrimaryKeyIngestPosition<?>) initialPosition).getEndValue());
+        calculateParam.setQueryRange(queryRange);
+        RecordSingleTableInventoryDumpCalculator dumpCalculator = new 
RecordSingleTableInventoryDumpCalculator(dumperContext.getBatchSize(), 
StreamingRangeType.SMALL);
+        long rowCount = 0L;
+        try {
+            String firstUniqueKey = 
calculateParam.getFirstUniqueKey().getName();
+            for (List<DataRecord> each : 
dumpCalculator.calculate(calculateParam)) {
+                channel.push(Collections.unmodifiableList(each));
+                IngestPosition position = 
PrimaryKeyIngestPositionFactory.newInstance(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size()
 - 1), firstUniqueKey), queryRange.getUpper());
+                dumperContext.getCommonContext().setPosition(position);
+                rowCount += each.size();
+            }
+        } finally {
+            QuietlyCloser.close(calculateParam.getCalculationContext());
         }
-        throw new PipelineInternalException("Primary key position is 
invalid.");
-    }
-    
-    private Object getFirstUniqueKeyValue(final List<Record> dataRecords, 
final int index) {
-        return ((DataRecord) 
dataRecords.get(index)).getUniqueKeyValue().iterator().next();
+        IngestPosition position = new IngestFinishedPosition();
+        channel.push(Collections.singletonList(new FinishedRecord(position)));
+        dumperContext.getCommonContext().setPosition(position);
+        log.info("Dump by calculator done, rowCount={}, dataSource={}, 
actualTable={}", rowCount, 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
     }
     
-    private void dumpWithStreamingQuery(final Connection connection) throws 
SQLException {
-        int batchSize = dumperContext.getBatchSize();
+    private void dumpWithStreamingQuery() throws SQLException {
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
-        fetchAllQuery(connection, databaseType, batchSize);
+        try (Connection connection = dataSource.getConnection()) {
+            fetchAllNoUniqueKeyQuery(connection, databaseType, 
dumperContext.getBatchSize());
+        }
     }
     
-    private void fetchAllQuery(final Connection connection, final DatabaseType 
databaseType, final int batchSize) throws SQLException {
-        log.info("Start to fetch all inventory data with streaming query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
-        try (PreparedStatement statement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildFetchAllSQLWithStreamingQuery(), batchSize)) {
+    private void fetchAllNoUniqueKeyQuery(final Connection connection, final 
DatabaseType databaseType, final int batchSize) throws SQLException {
+        log.info("Start to fetch all no unique key query, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+        try (PreparedStatement statement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildFetchAllNoUniqueKeySQL(), batchSize)) {
             runningStatement.set(statement);
             try (ResultSet resultSet = statement.executeQuery()) {
                 consumeResultSetToChannel(resultSet, batchSize);
@@ -261,7 +163,13 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 runningStatement.set(null);
             }
         }
-        log.info("End to fetch all inventory data with streaming query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+        log.info("End to fetch all no unique key query, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+    }
+    
+    private String buildFetchAllNoUniqueKeySQL() {
+        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+        List<String> columnNames = dumperContext.getQueryColumnNames();
+        return sqlBuilder.buildFetchAllSQL(schemaName, 
dumperContext.getActualTableName(), columnNames);
     }
     
     private void consumeResultSetToChannel(final ResultSet resultSet, final 
int batchSize) throws SQLException {
@@ -290,17 +198,50 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 dumperContext.getActualTableName());
     }
     
-    private String buildFetchAllSQLWithStreamingQuery() {
-        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
-        List<String> columnNames = dumperContext.getQueryColumnNames();
-        if (dumperContext.hasUniqueKey()) {
-            return sqlBuilder.buildFetchAllSQL(schemaName, 
dumperContext.getActualTableName(), columnNames, 
dumperContext.getUniqueKeyColumns().get(0).getName());
+    private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData) throws SQLException {
+        int columnCount = resultSetMetaData.getColumnCount();
+        String tableName = dumperContext.getLogicTableName();
+        DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
+        List<String> insertColumnNames = 
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
+        ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || 
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
+                () -> new PipelineInvalidParameterException("Insert column 
names count not equals ResultSet column count"));
+        for (int i = 1; i <= columnCount; i++) {
+            String columnName = insertColumnNames.isEmpty() ? 
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
+            Column column = getColumn(resultSet, resultSetMetaData, 
columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new 
ShardingSphereIdentifier(columnName)));
+            result.addColumn(column);
         }
-        return sqlBuilder.buildFetchAllSQL(schemaName, 
dumperContext.getActualTableName(), columnNames);
+        result.setActualTableName(dumperContext.getActualTableName());
+        return result;
+    }
+    
+    private Column getColumn(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final String columnName, final int 
columnIndex, final boolean isUniqueKey) throws SQLException {
+        return new NormalColumn(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true, 
isUniqueKey);
     }
     
     @Override
     protected void doStop() {
         
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
     }
+    
+    private class RecordSingleTableInventoryDumpCalculator extends 
AbstractRecordSingleTableInventoryCalculator<List<DataRecord>, DataRecord> {
+        
+        RecordSingleTableInventoryDumpCalculator(final int chunkSize, final 
StreamingRangeType streamingRangeType) {
+            super(chunkSize, streamingRangeType);
+        }
+        
+        @Override
+        protected DataRecord readRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final InventoryColumnValueReaderEngine 
columnValueReaderEngine) throws SQLException {
+            return loadDataRecord(resultSet, resultSetMetaData);
+        }
+        
+        @Override
+        protected Object getFirstUniqueKeyValue(final DataRecord record, final 
String firstUniqueKey) {
+            return record.getColumn(firstUniqueKey).getValue();
+        }
+        
+        @Override
+        protected List<DataRecord> convertRecordsToResult(final 
List<DataRecord> records, final Object maxUniqueKeyValue) {
+            return records;
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/InventoryQueryParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/InventoryQueryParameter.java
deleted file mode 100644
index 5370411aacd..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/InventoryQueryParameter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query;
-
-/**
- * Inventory query parameter.
- * 
- * @param <T> type of parameter value
- */
-public interface InventoryQueryParameter<T> {
-    
-    /**
-     * Get parameter value.
-     *
-     * @return parameter value
-     */
-    T getValue();
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/point/InventoryPointQueryParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/point/InventoryPointQueryParameter.java
deleted file mode 100644
index c7a8389860b..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/point/InventoryPointQueryParameter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
-
-/**
- * Inventory point query parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class InventoryPointQueryParameter implements 
InventoryQueryParameter<Object> {
-    
-    private final Object value;
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/range/InventoryRangeQueryParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/range/InventoryRangeQueryParameter.java
deleted file mode 100644
index 8514c87e122..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/range/InventoryRangeQueryParameter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
-
-/**
- * Inventory range query parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class InventoryRangeQueryParameter implements 
InventoryQueryParameter<QueryRange> {
-    
-    private final QueryRange value;
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index 9e047d3e3f9..26f3e189e08 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.record;
 
+import com.cedarsoftware.util.CaseInsensitiveMap;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -29,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Data record.
@@ -47,6 +49,8 @@ public final class DataRecord extends Record {
     
     private final List<Column> columns;
     
+    private final Map<String, Column> columnMap;
+    
     private final Collection<Object> uniqueKeyValue = new LinkedList<>();
     
     private final Collection<Object> oldUniqueKeyValues = new LinkedList<>();
@@ -65,6 +69,7 @@ public final class DataRecord extends Record {
         this.schemaName = schemaName;
         this.tableName = tableName;
         columns = new ArrayList<>(columnCount);
+        columnMap = new CaseInsensitiveMap<>(columnCount, 1F);
     }
     
     /**
@@ -74,6 +79,7 @@ public final class DataRecord extends Record {
      */
     public void addColumn(final Column data) {
         columns.add(data);
+        columnMap.put(data.getName(), data);
         if (data.isUniqueKey()) {
             uniqueKeyValue.add(data.getValue());
             oldUniqueKeyValues.add(data.getOldValue());
@@ -99,6 +105,16 @@ public final class DataRecord extends Record {
         return columns.get(index);
     }
     
+    /**
+     * Get column by name.
+     *
+     * @param name column name
+     * @return column
+     */
+    public Column getColumn(final String name) {
+        return columnMap.get(name);
+    }
+    
     /**
      * Get key.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index b529f12484a..9a9cf752d77 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -71,10 +71,10 @@ public final class InventoryDumperContextSplitter {
     
     private Collection<InventoryDumperContext> splitByTable() {
         return 
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().entrySet()
-                .stream().map(entry -> 
createTableSpLitDumperContext(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
+                .stream().map(entry -> 
createTableSplitDumperContext(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
     }
     
-    private InventoryDumperContext createTableSpLitDumperContext(final 
ShardingSphereIdentifier actualTableName, final ShardingSphereIdentifier 
logicTableName) {
+    private InventoryDumperContext createTableSplitDumperContext(final 
ShardingSphereIdentifier actualTableName, final ShardingSphereIdentifier 
logicTableName) {
         InventoryDumperContext result = new 
InventoryDumperContext(dumperContext.getCommonContext());
         result.setActualTableName(actualTableName.toString());
         result.setLogicTableName(logicTableName.toString());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/BuildDivisibleSQLParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/BuildDivisibleSQLParameter.java
deleted file mode 100644
index e50643d77c1..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/BuildDivisibleSQLParameter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
-
-/**
- * Build divisible SQL parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class BuildDivisibleSQLParameter {
-    
-    private final String schemaName;
-    
-    private final String tableName;
-    
-    private final Collection<String> columnNames;
-    
-    private final String uniqueKey;
-    
-    private final boolean lowerInclusive;
-    
-    private final boolean limited;
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
index fb4b3188ea3..b43c2d0a424 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
@@ -17,9 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
 
-import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
-import 
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 
 import java.util.Collection;
@@ -30,67 +28,12 @@ import java.util.stream.Collectors;
  */
 public final class PipelineInventoryDumpSQLBuilder {
     
-    private final DialectPipelineSQLBuilder dialectSQLBuilder;
-    
     private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
     
     public PipelineInventoryDumpSQLBuilder(final DatabaseType databaseType) {
-        dialectSQLBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
databaseType);
         sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
     }
     
-    /**
-     * Build divisible inventory dump SQL.
-     *
-     * @param param parameter
-     * @return built SQL
-     */
-    public String buildDivisibleSQL(final BuildDivisibleSQLParameter param) {
-        String queryColumns = buildQueryColumns(param.getColumnNames());
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(param.getSchemaName(), 
param.getTableName());
-        String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(param.getUniqueKey());
-        String operator = param.isLowerInclusive() ? ">=" : ">";
-        String sql = param.isLimited()
-                ? String.format("SELECT %s FROM %s WHERE %s%s? AND %s<=? ORDER 
BY %s ASC", queryColumns, qualifiedTableName, escapedUniqueKey, operator, 
escapedUniqueKey, escapedUniqueKey)
-                : String.format("SELECT %s FROM %s WHERE %s%s? ORDER BY %s 
ASC", queryColumns, qualifiedTableName, escapedUniqueKey, operator, 
escapedUniqueKey);
-        return dialectSQLBuilder.wrapWithPageQuery(sql);
-    }
-    
-    /**
-     * Build indivisible inventory dump SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param columnNames column names
-     * @param uniqueKey unique key
-     * @return built SQL
-     */
-    public String buildIndivisibleSQL(final String schemaName, final String 
tableName, final Collection<String> columnNames, final String uniqueKey) {
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
-        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
-        return String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
-    }
-    
-    private String buildQueryColumns(final Collection<String> columnNames) {
-        return 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
-    }
-    
-    /**
-     * Build point query SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName table name
-     * @param columnNames column names
-     * @param uniqueKey unique key
-     * @return built SQL
-     */
-    public String buildPointQuerySQL(final String schemaName, final String 
tableName, final Collection<String> columnNames, final String uniqueKey) {
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
-        String queryColumns = 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
-        String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
-        return String.format("SELECT %s FROM %s WHERE %s=?", queryColumns, 
qualifiedTableName, escapedUniqueKey);
-    }
-    
     /**
      * Build fetch all inventory dump SQL.
      *
@@ -104,20 +47,4 @@ public final class PipelineInventoryDumpSQLBuilder {
         String queryColumns = 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
         return String.format("SELECT %s FROM %s", queryColumns, 
qualifiedTableName);
     }
-    
-    /**
-     * Build fetch all inventory dump SQL.
-     *
-     * @param schemaName schema name
-     * @param tableName tableName
-     * @param columnNames column names
-     * @param uniqueKey unique key
-     * @return built SQL
-     */
-    public String buildFetchAllSQL(final String schemaName, final String 
tableName, final Collection<String> columnNames, final String uniqueKey) {
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
-        String queryColumns = 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
-        String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
-        return String.format("SELECT %s FROM %s ORDER BY %s ASC", 
queryColumns, qualifiedTableName, quotedUniqueKey);
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
index d4a8b179849..fc19d2f986c 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
@@ -31,43 +31,11 @@ class PipelineInventoryDumpSQLBuilderTest {
     
     private final PipelineInventoryDumpSQLBuilder sqlBuilder = new 
PipelineInventoryDumpSQLBuilder(TypedSPILoader.getService(DatabaseType.class, 
"FIXTURE"));
     
-    @Test
-    void assertBuildDivisibleSQL() {
-        String actual = sqlBuilder.buildDivisibleSQL(new 
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id", 
"user_id", "status"), "order_id", true, true));
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC LIMIT ?"));
-        actual = sqlBuilder.buildDivisibleSQL(new 
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id", 
"user_id", "status"), "order_id", false, true));
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>? AND order_id<=? ORDER BY order_id ASC LIMIT ?"));
-    }
-    
-    @Test
-    void assertBuildUnlimitedDivisibleSQL() {
-        String actual = sqlBuilder.buildDivisibleSQL(new 
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id", 
"user_id", "status"), "order_id", true, false));
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>=? ORDER BY order_id ASC LIMIT ?"));
-        actual = sqlBuilder.buildDivisibleSQL(new 
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id", 
"user_id", "status"), "order_id", false, false));
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>? ORDER BY order_id ASC LIMIT ?"));
-    }
-    
-    @Test
-    void assertBuildIndivisibleSQL() {
-        String actual = sqlBuilder.buildIndivisibleSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id");
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
ORDER BY order_id ASC"));
-    }
-    
-    @Test
-    void assertBuildPointQuerySQL() {
-        String actual = sqlBuilder.buildPointQuerySQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id");
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id=?"));
-    }
-    
     @Test
     void assertBuildFetchAllSQL() {
         String actual = sqlBuilder.buildFetchAllSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"));
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order"));
         actual = sqlBuilder.buildFetchAllSQL(null, "t_order", 
Collections.singletonList("*"));
         assertThat(actual, is("SELECT * FROM t_order"));
-        actual = sqlBuilder.buildFetchAllSQL(null, "t_order", 
Arrays.asList("order_id", "user_id", "status"), "order_id");
-        assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
ORDER BY order_id ASC"));
-        actual = sqlBuilder.buildFetchAllSQL(null, "t_order", 
Collections.singletonList("*"), "order_id");
-        assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
     }
 }


Reply via email to