This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ce1b0577316 Refactor InventoryDumper to reuse
AbstractRecordSingleTableInventoryCalculator (#36830)
ce1b0577316 is described below
commit ce1b05773164938235b0ea651232661bbcae8fa4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Oct 9 19:09:08 2025 +0800
Refactor InventoryDumper to reuse
AbstractRecordSingleTableInventoryCalculator (#36830)
---
.../ingest/dumper/inventory/InventoryDumper.java | 241 ++++++++-------------
.../inventory/query/InventoryQueryParameter.java | 33 ---
.../query/point/InventoryPointQueryParameter.java | 32 ---
.../query/range/InventoryRangeQueryParameter.java | 32 ---
.../pipeline/core/ingest/record/DataRecord.java | 16 ++
.../splitter/InventoryDumperContextSplitter.java | 4 +-
.../sqlbuilder/sql/BuildDivisibleSQLParameter.java | 43 ----
.../sql/PipelineInventoryDumpSQLBuilder.java | 73 -------
.../sql/PipelineInventoryDumpSQLBuilderTest.java | 32 ---
9 files changed, 109 insertions(+), 397 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 27b62bbd346..492924e58e8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -19,17 +19,18 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.AbstractRecordSingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculateParameter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.StreamingRangeType;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point.InventoryPointQueryParameter;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.InventoryRangeQueryParameter;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -40,18 +41,17 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.BuildDivisibleSQLParameter;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelineInventoryDumpSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
-import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -61,9 +61,7 @@ import java.sql.Statement;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -77,7 +75,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
private final PipelineChannel channel;
- private final DataSource dataSource;
+ private final PipelineDataSource dataSource;
private final InventoryDataRecordPositionCreator positionCreator;
@@ -87,7 +85,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
private final AtomicReference<Statement> runningStatement = new
AtomicReference<>();
- public InventoryDumper(final InventoryDumperContext dumperContext, final
PipelineChannel channel, final DataSource dataSource, final
InventoryDataRecordPositionCreator positionCreator) {
+ public InventoryDumper(final InventoryDumperContext dumperContext, final
PipelineChannel channel, final PipelineDataSource dataSource, final
InventoryDataRecordPositionCreator positionCreator) {
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
@@ -104,11 +102,11 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
log.info("Ignored because of already finished.");
return;
}
- try (Connection connection = dataSource.getConnection()) {
- if (!dumperContext.hasUniqueKey() ||
isPrimaryKeyWithoutRange(position)) {
- dumpWithStreamingQuery(connection);
+ try {
+ if (dumperContext.hasUniqueKey()) {
+ dumpByCalculator();
} else {
- dumpByPage(connection);
+ dumpWithStreamingQuery();
}
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
@@ -118,142 +116,46 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
- return position instanceof PrimaryKeyIngestPosition && null ==
((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null ==
((PrimaryKeyIngestPosition<?>) position).getEndValue();
- }
-
- private void dumpByPage(final Connection connection) throws SQLException {
- log.info("Start to dump inventory data by page, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
- boolean firstQuery = true;
- AtomicLong rowCount = new AtomicLong();
- IngestPosition position =
dumperContext.getCommonContext().getPosition();
- do {
- QueryRange queryRange = new
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery
&& dumperContext.isFirstDump(),
- ((PrimaryKeyIngestPosition<?>) position).getEndValue());
- InventoryQueryParameter<?> queryParam = new
InventoryRangeQueryParameter(queryRange);
- List<Record> dataRecords = dumpByPage(connection, queryParam,
rowCount);
- if (dataRecords.size() > 1 &&
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0),
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
- queryParam = new
InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
- dataRecords = dumpByPage(connection, queryParam, rowCount);
- }
- firstQuery = false;
- if (dataRecords.isEmpty()) {
- position = new IngestFinishedPosition();
- dataRecords.add(new FinishedRecord(position));
- log.info("Inventory dump done, rowCount={}, dataSource={},
actualTable={}", rowCount,
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
- } else {
- position =
PrimaryKeyIngestPositionFactory.newInstance(getFirstUniqueKeyValue(dataRecords,
dataRecords.size() - 1), queryRange.getUpper());
- }
- channel.push(dataRecords);
- dumperContext.getCommonContext().setPosition(position);
- } while (!(position instanceof IngestFinishedPosition));
- log.info("End to dump inventory data by page, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
- }
-
- private List<Record> dumpByPage(final Connection connection, final
InventoryQueryParameter<?> queryParam, final AtomicLong rowCount) throws
SQLException {
- DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
- int batchSize = dumperContext.getBatchSize();
- try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpByPageSQL(queryParam), batchSize)) {
- runningStatement.set(preparedStatement);
- setParameters(preparedStatement, queryParam);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- JobRateLimitAlgorithm rateLimitAlgorithm =
dumperContext.getRateLimitAlgorithm();
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- List<Record> result = new LinkedList<>();
- while (resultSet.next()) {
- if (result.size() >= batchSize) {
- if (!dumperContext.hasUniqueKey()) {
- channel.push(result);
- }
- result = new LinkedList<>();
- }
- result.add(loadDataRecord(resultSet, resultSetMetaData));
- rowCount.incrementAndGet();
- if (!isRunning()) {
- log.info("Broke because of inventory dump is not
running.");
- break;
- }
- if (null != rateLimitAlgorithm && 0 == rowCount.get() %
batchSize) {
-
rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
- }
- }
- return result;
- } finally {
- runningStatement.set(null);
- }
- }
- }
-
- private void setParameters(final PreparedStatement preparedStatement,
final InventoryQueryParameter<?> queryParam) throws SQLException {
- if (queryParam instanceof InventoryRangeQueryParameter) {
- int parameterIndex = 1;
- Object lower = ((InventoryRangeQueryParameter)
queryParam).getValue().getLower();
- if (null != lower) {
- preparedStatement.setObject(parameterIndex++, lower);
- }
- Object upper = ((InventoryRangeQueryParameter)
queryParam).getValue().getUpper();
- if (null != upper) {
- preparedStatement.setObject(parameterIndex++, upper);
- }
- preparedStatement.setInt(parameterIndex,
dumperContext.getBatchSize());
- } else if (queryParam instanceof InventoryPointQueryParameter) {
- preparedStatement.setObject(1, queryParam.getValue());
- } else {
- throw new UnsupportedOperationException("Query type: " +
queryParam.getValue());
- }
- }
-
- private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData) throws SQLException {
- int columnCount = resultSetMetaData.getColumnCount();
- String tableName = dumperContext.getLogicTableName();
- DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
- List<String> insertColumnNames =
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
- ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() ||
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
- () -> new PipelineInvalidParameterException("Insert column
names count not equals ResultSet column count"));
- for (int i = 1; i <= columnCount; i++) {
- String columnName = insertColumnNames.isEmpty() ?
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
- Column column = getColumn(resultSet, resultSetMetaData,
columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new
ShardingSphereIdentifier(columnName)));
- result.addColumn(column);
- }
- result.setActualTableName(dumperContext.getActualTableName());
- return result;
- }
-
- private Column getColumn(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final String columnName, final int
columnIndex, final boolean isUniqueKey) throws SQLException {
- return new NormalColumn(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true,
isUniqueKey);
- }
-
- private String buildDumpByPageSQL(final InventoryQueryParameter<?>
queryParam) {
+ private void dumpByCalculator() throws SQLException {
+ log.info("Dump by calculator start, dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
- PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
+ QualifiedTable table = new QualifiedTable(schemaName,
dumperContext.getActualTableName());
List<String> columnNames = dumperContext.getQueryColumnNames();
- if (queryParam instanceof InventoryPointQueryParameter) {
- return sqlBuilder.buildPointQuerySQL(schemaName,
dumperContext.getActualTableName(), columnNames, firstColumn.getName());
- }
- QueryRange queryRange = ((InventoryRangeQueryParameter)
queryParam).getValue();
- boolean lowerInclusive = queryRange.isLowerInclusive();
- if (null != queryRange.getLower() && null != queryRange.getUpper()) {
- return sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(),
columnNames, firstColumn.getName(), lowerInclusive, true));
- }
- if (null != queryRange.getLower()) {
- return sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(),
columnNames, firstColumn.getName(), lowerInclusive, false));
+ SingleTableInventoryCalculateParameter calculateParam = new
SingleTableInventoryCalculateParameter(dataSource, table,
+ columnNames, dumperContext.getUniqueKeyColumns(),
QueryType.RANGE_QUERY, null);
+ IngestPosition initialPosition =
dumperContext.getCommonContext().getPosition();
+ QueryRange queryRange = new QueryRange(((PrimaryKeyIngestPosition<?>)
initialPosition).getBeginValue(), dumperContext.isFirstDump(),
+ ((PrimaryKeyIngestPosition<?>) initialPosition).getEndValue());
+ calculateParam.setQueryRange(queryRange);
+ RecordSingleTableInventoryDumpCalculator dumpCalculator = new
RecordSingleTableInventoryDumpCalculator(dumperContext.getBatchSize(),
StreamingRangeType.SMALL);
+ long rowCount = 0L;
+ try {
+ String firstUniqueKey =
calculateParam.getFirstUniqueKey().getName();
+ for (List<DataRecord> each :
dumpCalculator.calculate(calculateParam)) {
+ channel.push(Collections.unmodifiableList(each));
+ IngestPosition position =
PrimaryKeyIngestPositionFactory.newInstance(dumpCalculator.getFirstUniqueKeyValue(each.get(each.size()
- 1), firstUniqueKey), queryRange.getUpper());
+ dumperContext.getCommonContext().setPosition(position);
+ rowCount += each.size();
+ }
+ } finally {
+ QuietlyCloser.close(calculateParam.getCalculationContext());
}
- throw new PipelineInternalException("Primary key position is
invalid.");
- }
-
- private Object getFirstUniqueKeyValue(final List<Record> dataRecords,
final int index) {
- return ((DataRecord)
dataRecords.get(index)).getUniqueKeyValue().iterator().next();
+ IngestPosition position = new IngestFinishedPosition();
+ channel.push(Collections.singletonList(new FinishedRecord(position)));
+ dumperContext.getCommonContext().setPosition(position);
+ log.info("Dump by calculator done, rowCount={}, dataSource={},
actualTable={}", rowCount,
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
}
- private void dumpWithStreamingQuery(final Connection connection) throws
SQLException {
- int batchSize = dumperContext.getBatchSize();
+ private void dumpWithStreamingQuery() throws SQLException {
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
- fetchAllQuery(connection, databaseType, batchSize);
+ try (Connection connection = dataSource.getConnection()) {
+ fetchAllNoUniqueKeyQuery(connection, databaseType,
dumperContext.getBatchSize());
+ }
}
- private void fetchAllQuery(final Connection connection, final DatabaseType
databaseType, final int batchSize) throws SQLException {
- log.info("Start to fetch all inventory data with streaming query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
- try (PreparedStatement statement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildFetchAllSQLWithStreamingQuery(), batchSize)) {
+ private void fetchAllNoUniqueKeyQuery(final Connection connection, final
DatabaseType databaseType, final int batchSize) throws SQLException {
+ log.info("Start to fetch all no unique key query, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ try (PreparedStatement statement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildFetchAllNoUniqueKeySQL(), batchSize)) {
runningStatement.set(statement);
try (ResultSet resultSet = statement.executeQuery()) {
consumeResultSetToChannel(resultSet, batchSize);
@@ -261,7 +163,13 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
runningStatement.set(null);
}
}
- log.info("End to fetch all inventory data with streaming query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ log.info("End to fetch all no unique key query, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ }
+
+ private String buildFetchAllNoUniqueKeySQL() {
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ List<String> columnNames = dumperContext.getQueryColumnNames();
+ return sqlBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName(), columnNames);
}
private void consumeResultSetToChannel(final ResultSet resultSet, final
int batchSize) throws SQLException {
@@ -290,17 +198,50 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
dumperContext.getActualTableName());
}
- private String buildFetchAllSQLWithStreamingQuery() {
- String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
- List<String> columnNames = dumperContext.getQueryColumnNames();
- if (dumperContext.hasUniqueKey()) {
- return sqlBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName(), columnNames,
dumperContext.getUniqueKeyColumns().get(0).getName());
+ private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData) throws SQLException {
+ int columnCount = resultSetMetaData.getColumnCount();
+ String tableName = dumperContext.getLogicTableName();
+ DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
+ List<String> insertColumnNames =
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
+ ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() ||
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
+ () -> new PipelineInvalidParameterException("Insert column
names count not equals ResultSet column count"));
+ for (int i = 1; i <= columnCount; i++) {
+ String columnName = insertColumnNames.isEmpty() ?
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
+ Column column = getColumn(resultSet, resultSetMetaData,
columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new
ShardingSphereIdentifier(columnName)));
+ result.addColumn(column);
}
- return sqlBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName(), columnNames);
+ result.setActualTableName(dumperContext.getActualTableName());
+ return result;
+ }
+
+ private Column getColumn(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final String columnName, final int
columnIndex, final boolean isUniqueKey) throws SQLException {
+ return new NormalColumn(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true,
isUniqueKey);
}
@Override
protected void doStop() {
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
}
+
+ private class RecordSingleTableInventoryDumpCalculator extends
AbstractRecordSingleTableInventoryCalculator<List<DataRecord>, DataRecord> {
+
+ RecordSingleTableInventoryDumpCalculator(final int chunkSize, final
StreamingRangeType streamingRangeType) {
+ super(chunkSize, streamingRangeType);
+ }
+
+ @Override
+ protected DataRecord readRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final InventoryColumnValueReaderEngine
columnValueReaderEngine) throws SQLException {
+ return loadDataRecord(resultSet, resultSetMetaData);
+ }
+
+ @Override
+ protected Object getFirstUniqueKeyValue(final DataRecord record, final
String firstUniqueKey) {
+ return record.getColumn(firstUniqueKey).getValue();
+ }
+
+ @Override
+ protected List<DataRecord> convertRecordsToResult(final
List<DataRecord> records, final Object maxUniqueKeyValue) {
+ return records;
+ }
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/InventoryQueryParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/InventoryQueryParameter.java
deleted file mode 100644
index 5370411aacd..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/InventoryQueryParameter.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query;
-
-/**
- * Inventory query parameter.
- *
- * @param <T> type of parameter value
- */
-public interface InventoryQueryParameter<T> {
-
- /**
- * Get parameter value.
- *
- * @return parameter value
- */
- T getValue();
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/point/InventoryPointQueryParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/point/InventoryPointQueryParameter.java
deleted file mode 100644
index c7a8389860b..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/point/InventoryPointQueryParameter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.point;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
-
-/**
- * Inventory point query parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class InventoryPointQueryParameter implements
InventoryQueryParameter<Object> {
-
- private final Object value;
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/range/InventoryRangeQueryParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/range/InventoryRangeQueryParameter.java
deleted file mode 100644
index 8514c87e122..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/range/InventoryRangeQueryParameter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.InventoryQueryParameter;
-
-/**
- * Inventory range query parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class InventoryRangeQueryParameter implements
InventoryQueryParameter<QueryRange> {
-
- private final QueryRange value;
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
index 9e047d3e3f9..26f3e189e08 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/record/DataRecord.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.record;
+import com.cedarsoftware.util.CaseInsensitiveMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -29,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
/**
* Data record.
@@ -47,6 +49,8 @@ public final class DataRecord extends Record {
private final List<Column> columns;
+ private final Map<String, Column> columnMap;
+
private final Collection<Object> uniqueKeyValue = new LinkedList<>();
private final Collection<Object> oldUniqueKeyValues = new LinkedList<>();
@@ -65,6 +69,7 @@ public final class DataRecord extends Record {
this.schemaName = schemaName;
this.tableName = tableName;
columns = new ArrayList<>(columnCount);
+ columnMap = new CaseInsensitiveMap<>(columnCount, 1F);
}
/**
@@ -74,6 +79,7 @@ public final class DataRecord extends Record {
*/
public void addColumn(final Column data) {
columns.add(data);
+ columnMap.put(data.getName(), data);
if (data.isUniqueKey()) {
uniqueKeyValue.add(data.getValue());
oldUniqueKeyValues.add(data.getOldValue());
@@ -99,6 +105,16 @@ public final class DataRecord extends Record {
return columns.get(index);
}
+ /**
+ * Get column by name.
+ *
+ * @param name column name
+ * @return column
+ */
+ public Column getColumn(final String name) {
+ return columnMap.get(name);
+ }
+
/**
* Get key.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index b529f12484a..9a9cf752d77 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -71,10 +71,10 @@ public final class InventoryDumperContextSplitter {
private Collection<InventoryDumperContext> splitByTable() {
return
dumperContext.getCommonContext().getTableNameMapper().getTableNameMap().entrySet()
- .stream().map(entry ->
createTableSpLitDumperContext(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ .stream().map(entry ->
createTableSplitDumperContext(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
}
- private InventoryDumperContext createTableSpLitDumperContext(final
ShardingSphereIdentifier actualTableName, final ShardingSphereIdentifier
logicTableName) {
+ private InventoryDumperContext createTableSplitDumperContext(final
ShardingSphereIdentifier actualTableName, final ShardingSphereIdentifier
logicTableName) {
InventoryDumperContext result = new
InventoryDumperContext(dumperContext.getCommonContext());
result.setActualTableName(actualTableName.toString());
result.setLogicTableName(logicTableName.toString());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/BuildDivisibleSQLParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/BuildDivisibleSQLParameter.java
deleted file mode 100644
index e50643d77c1..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/BuildDivisibleSQLParameter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-import java.util.Collection;
-
-/**
- * Build divisible SQL parameter.
- */
-@RequiredArgsConstructor
-@Getter
-public final class BuildDivisibleSQLParameter {
-
- private final String schemaName;
-
- private final String tableName;
-
- private final Collection<String> columnNames;
-
- private final String uniqueKey;
-
- private final boolean lowerInclusive;
-
- private final boolean limited;
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
index fb4b3188ea3..b43c2d0a424 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilder.java
@@ -17,9 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
-import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
-import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import java.util.Collection;
@@ -30,67 +28,12 @@ import java.util.stream.Collectors;
*/
public final class PipelineInventoryDumpSQLBuilder {
- private final DialectPipelineSQLBuilder dialectSQLBuilder;
-
private final PipelineSQLSegmentBuilder sqlSegmentBuilder;
public PipelineInventoryDumpSQLBuilder(final DatabaseType databaseType) {
- dialectSQLBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
databaseType);
sqlSegmentBuilder = new PipelineSQLSegmentBuilder(databaseType);
}
- /**
- * Build divisible inventory dump SQL.
- *
- * @param param parameter
- * @return built SQL
- */
- public String buildDivisibleSQL(final BuildDivisibleSQLParameter param) {
- String queryColumns = buildQueryColumns(param.getColumnNames());
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(param.getSchemaName(),
param.getTableName());
- String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(param.getUniqueKey());
- String operator = param.isLowerInclusive() ? ">=" : ">";
- String sql = param.isLimited()
- ? String.format("SELECT %s FROM %s WHERE %s%s? AND %s<=? ORDER
BY %s ASC", queryColumns, qualifiedTableName, escapedUniqueKey, operator,
escapedUniqueKey, escapedUniqueKey)
- : String.format("SELECT %s FROM %s WHERE %s%s? ORDER BY %s
ASC", queryColumns, qualifiedTableName, escapedUniqueKey, operator,
escapedUniqueKey);
- return dialectSQLBuilder.wrapWithPageQuery(sql);
- }
-
- /**
- * Build indivisible inventory dump SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key
- * @return built SQL
- */
- public String buildIndivisibleSQL(final String schemaName, final String
tableName, final Collection<String> columnNames, final String uniqueKey) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
- String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
- return String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
- }
-
- private String buildQueryColumns(final Collection<String> columnNames) {
- return
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
- }
-
- /**
- * Build point query SQL.
- *
- * @param schemaName schema name
- * @param tableName table name
- * @param columnNames column names
- * @param uniqueKey unique key
- * @return built SQL
- */
- public String buildPointQuerySQL(final String schemaName, final String
tableName, final Collection<String> columnNames, final String uniqueKey) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
- String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
- String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
- return String.format("SELECT %s FROM %s WHERE %s=?", queryColumns,
qualifiedTableName, escapedUniqueKey);
- }
-
/**
* Build fetch all inventory dump SQL.
*
@@ -104,20 +47,4 @@ public final class PipelineInventoryDumpSQLBuilder {
String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
return String.format("SELECT %s FROM %s", queryColumns,
qualifiedTableName);
}
-
- /**
- * Build fetch all inventory dump SQL.
- *
- * @param schemaName schema name
- * @param tableName tableName
- * @param columnNames column names
- * @param uniqueKey unique key
- * @return built SQL
- */
- public String buildFetchAllSQL(final String schemaName, final String
tableName, final Collection<String> columnNames, final String uniqueKey) {
- String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
- String queryColumns =
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
- String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
- return String.format("SELECT %s FROM %s ORDER BY %s ASC",
queryColumns, qualifiedTableName, quotedUniqueKey);
- }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
index d4a8b179849..fc19d2f986c 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineInventoryDumpSQLBuilderTest.java
@@ -31,43 +31,11 @@ class PipelineInventoryDumpSQLBuilderTest {
private final PipelineInventoryDumpSQLBuilder sqlBuilder = new
PipelineInventoryDumpSQLBuilder(TypedSPILoader.getService(DatabaseType.class,
"FIXTURE"));
- @Test
- void assertBuildDivisibleSQL() {
- String actual = sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id",
"user_id", "status"), "order_id", true, true));
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC LIMIT ?"));
- actual = sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id",
"user_id", "status"), "order_id", false, true));
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? AND order_id<=? ORDER BY order_id ASC LIMIT ?"));
- }
-
- @Test
- void assertBuildUnlimitedDivisibleSQL() {
- String actual = sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id",
"user_id", "status"), "order_id", true, false));
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>=? ORDER BY order_id ASC LIMIT ?"));
- actual = sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(null, "t_order", Arrays.asList("order_id",
"user_id", "status"), "order_id", false, false));
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id>? ORDER BY order_id ASC LIMIT ?"));
- }
-
- @Test
- void assertBuildIndivisibleSQL() {
- String actual = sqlBuilder.buildIndivisibleSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id");
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC"));
- }
-
- @Test
- void assertBuildPointQuerySQL() {
- String actual = sqlBuilder.buildPointQuerySQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id");
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
WHERE order_id=?"));
- }
-
@Test
void assertBuildFetchAllSQL() {
String actual = sqlBuilder.buildFetchAllSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"));
assertThat(actual, is("SELECT order_id,user_id,status FROM t_order"));
actual = sqlBuilder.buildFetchAllSQL(null, "t_order",
Collections.singletonList("*"));
assertThat(actual, is("SELECT * FROM t_order"));
- actual = sqlBuilder.buildFetchAllSQL(null, "t_order",
Arrays.asList("order_id", "user_id", "status"), "order_id");
- assertThat(actual, is("SELECT order_id,user_id,status FROM t_order
ORDER BY order_id ASC"));
- actual = sqlBuilder.buildFetchAllSQL(null, "t_order",
Collections.singletonList("*"), "order_id");
- assertThat(actual, is("SELECT * FROM t_order ORDER BY order_id ASC"));
}
}