This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dca3ca578b4 Refactor InventoryDumper. (#33930)
dca3ca578b4 is described below
commit dca3ca578b4f660aaec866f4c9c03a00aa6d31de
Author: Cong Hu <[email protected]>
AuthorDate: Fri Dec 6 16:35:09 2024 +0800
Refactor InventoryDumper. (#33930)
---
.../ingest/dumper/inventory/InventoryDumper.java | 146 +++++++++++----------
1 file changed, 80 insertions(+), 66 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index daab070fcce..ff7a42fbb0c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
@@ -111,10 +111,10 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
try (Connection connection = dataSource.getConnection()) {
- if (Strings.isNullOrEmpty(dumperContext.getQuerySQL()) &&
dumperContext.hasUniqueKey() && !isPrimaryKeyWithoutRange(position)) {
- dumpPageByPage(connection, tableMetaData);
- } else {
+ if (StringUtils.isNotBlank(dumperContext.getQuerySQL()) ||
!dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
dumpWithStreamingQuery(connection, tableMetaData);
+ } else {
+ dumpByPage(connection, tableMetaData);
}
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
@@ -135,20 +135,21 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
@SuppressWarnings("MagicConstant")
- private void dumpPageByPage(final Connection connection, final
PipelineTableMetaData tableMetaData) throws SQLException {
+ private void dumpByPage(final Connection connection, final
PipelineTableMetaData tableMetaData) throws SQLException {
+ log.info("Start to dump inventory data by page, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
boolean firstQuery = true;
AtomicLong rowCount = new AtomicLong();
IngestPosition position =
dumperContext.getCommonContext().getPosition();
- while (true) {
+ do {
QueryRange queryRange = new
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(),
firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
InventoryQueryParameter<?> queryParam = new
InventoryRangeQueryParameter(queryRange);
- List<Record> dataRecords = dumpPageByPage(connection, queryParam,
rowCount, tableMetaData);
+ List<Record> dataRecords = dumpByPage(connection, queryParam,
rowCount, tableMetaData);
if (dataRecords.size() > 1 &&
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0),
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
queryParam = new
InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
- dataRecords = dumpPageByPage(connection, queryParam, rowCount,
tableMetaData);
+ dataRecords = dumpByPage(connection, queryParam, rowCount,
tableMetaData);
}
firstQuery = false;
if (dataRecords.isEmpty()) {
@@ -160,19 +161,17 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
channel.push(dataRecords);
dumperContext.getCommonContext().setPosition(position);
- if (position instanceof IngestFinishedPosition) {
- break;
- }
- }
+ } while (!(position instanceof IngestFinishedPosition));
+ log.info("End to dump inventory data by page, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
}
- private List<Record> dumpPageByPage(final Connection connection,
- final InventoryQueryParameter<?>
queryParam, final AtomicLong rowCount, final PipelineTableMetaData
tableMetaData) throws SQLException {
+ private List<Record> dumpByPage(final Connection connection,
+ final InventoryQueryParameter<?>
queryParam, final AtomicLong rowCount, final PipelineTableMetaData
tableMetaData) throws SQLException {
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
int batchSize = dumperContext.getBatchSize();
- try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpPageByPageSQL(queryParam), batchSize)) {
+ try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpByPageSQL(queryParam), batchSize)) {
runningStatement.set(preparedStatement);
- setParameters(preparedStatement, queryParam, false);
+ setParameters(preparedStatement, queryParam);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
JobRateLimitAlgorithm rateLimitAlgorithm =
dumperContext.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
@@ -201,31 +200,19 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private void setParameters(final PreparedStatement preparedStatement,
final InventoryQueryParameter<?> queryParam, final boolean streamingQuery)
throws SQLException {
- if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
- for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
- preparedStatement.setObject(i + 1,
dumperContext.getQueryParams().get(i));
- }
- return;
- }
- if (!dumperContext.hasUniqueKey()) {
- return;
- }
- int parameterIndex = 1;
+ private void setParameters(final PreparedStatement preparedStatement,
final InventoryQueryParameter<?> queryParam) throws SQLException {
if (queryParam instanceof InventoryRangeQueryParameter) {
Object lower = ((InventoryRangeQueryParameter)
queryParam).getValue().getLower();
if (null != lower) {
- preparedStatement.setObject(parameterIndex++, lower);
+ preparedStatement.setObject(1, lower);
}
Object upper = ((InventoryRangeQueryParameter)
queryParam).getValue().getUpper();
if (null != upper) {
- preparedStatement.setObject(parameterIndex++, upper);
- }
- if (!streamingQuery) {
- preparedStatement.setInt(parameterIndex,
dumperContext.getBatchSize());
+ preparedStatement.setObject(2, upper);
}
+ preparedStatement.setInt(3, dumperContext.getBatchSize());
} else if (queryParam instanceof InventoryPointQueryParameter) {
- preparedStatement.setObject(parameterIndex, queryParam.getValue());
+ preparedStatement.setObject(1, queryParam.getValue());
} else {
throw new UnsupportedOperationException("Query type: " +
queryParam.getValue());
}
@@ -247,7 +234,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
return result;
}
- private String buildDumpPageByPageSQL(final InventoryQueryParameter<?>
queryParam) {
+ private String buildDumpByPageSQL(final InventoryQueryParameter<?>
queryParam) {
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
List<String> columnNames = dumperContext.getQueryColumnNames();
@@ -276,45 +263,72 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
- try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpSQLWithStreamingQuery(), batchSize)) {
- runningStatement.set(preparedStatement);
- PrimaryKeyIngestPosition<?> primaryPosition =
(PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
- InventoryRangeQueryParameter queryParam = new
InventoryRangeQueryParameter(new QueryRange(primaryPosition.getBeginValue(),
true, primaryPosition.getEndValue()));
- setParameters(preparedStatement, queryParam, true);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- int rowCount = 0;
- JobRateLimitAlgorithm rateLimitAlgorithm =
dumperContext.getRateLimitAlgorithm();
- ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
- List<Record> dataRecords = new LinkedList<>();
- while (resultSet.next()) {
- if (dataRecords.size() >= batchSize) {
- channel.push(dataRecords);
- dataRecords = new LinkedList<>();
- }
- dataRecords.add(loadDataRecord(resultSet,
resultSetMetaData, tableMetaData));
- ++rowCount;
- if (!isRunning()) {
- log.info("Broke because of inventory dump is not
running.");
- break;
- }
- if (null != rateLimitAlgorithm && 0 == rowCount %
batchSize) {
-
rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
- }
- }
- dataRecords.add(new FinishedRecord(new
IngestFinishedPosition()));
- channel.push(dataRecords);
- log.info("Inventory dump with streaming query done,
rowCount={}, dataSource={}, actualTable={}",
- rowCount,
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ if (null == dumperContext.getQuerySQL()) {
+ fetchAllQuery(connection, tableMetaData, databaseType, batchSize);
+ } else {
+ designatedParametersQuery(connection, tableMetaData, databaseType,
batchSize);
+ }
+ }
+
+ private void fetchAllQuery(final Connection connection, final
PipelineTableMetaData tableMetaData, final DatabaseType databaseType,
+ final int batchSize) throws SQLException {
+ log.info("Start to fetch all inventory data with streaming query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ try (PreparedStatement statement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildFetchAllSQLWithStreamingQuery(), batchSize)) {
+ runningStatement.set(statement);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
+ } finally {
+ runningStatement.set(null);
+ }
+ }
+ log.info("End to fetch all inventory data with streaming query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ }
+
+ private void designatedParametersQuery(final Connection connection, final
PipelineTableMetaData tableMetaData, final DatabaseType databaseType, final int
batchSize) throws SQLException {
+ log.info("Start to dump inventory data with designated parameters
query, dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
+ dumperContext.getActualTableName());
+ try (PreparedStatement statement =
JDBCStreamQueryBuilder.build(databaseType, connection,
dumperContext.getQuerySQL(), batchSize)) {
+ runningStatement.set(statement);
+ for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
+ statement.setObject(i + 1,
dumperContext.getQueryParams().get(i));
+ }
+ try (ResultSet resultSet = statement.executeQuery()) {
+ consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
} finally {
runningStatement.set(null);
}
}
+ log.info("End to dump inventory data with designated parameters query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
+ dumperContext.getActualTableName());
}
- private String buildDumpSQLWithStreamingQuery() {
- if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
- return dumperContext.getQuerySQL();
+ private void consumeResultSetToChannel(final PipelineTableMetaData
tableMetaData, final ResultSet resultSet, final int batchSize) throws
SQLException {
+ int rowCount = 0;
+ JobRateLimitAlgorithm rateLimitAlgorithm =
dumperContext.getRateLimitAlgorithm();
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ List<Record> dataRecords = new LinkedList<>();
+ while (resultSet.next()) {
+ if (dataRecords.size() >= batchSize) {
+ channel.push(dataRecords);
+ dataRecords = new LinkedList<>();
+ }
+ dataRecords.add(loadDataRecord(resultSet, resultSetMetaData,
tableMetaData));
+ ++rowCount;
+ if (!isRunning()) {
+ log.info("Broke because of inventory dump is not running.");
+ break;
+ }
+ if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
+ rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT,
1);
+ }
}
+ dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
+ channel.push(dataRecords);
+ log.info("Inventory dump with streaming query done, rowCount={},
dataSource={}, actualTable={}", rowCount,
dumperContext.getCommonContext().getDataSourceName(),
+ dumperContext.getActualTableName());
+ }
+
+ private String buildFetchAllSQLWithStreamingQuery() {
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
List<String> columnNames = dumperContext.getQueryColumnNames();
return sqlBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName(), columnNames);