This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ffb2c9b4611 Refactor InventoryDumper (#32702)
ffb2c9b4611 is described below
commit ffb2c9b4611bc458e651691a577934ef4a41c5d0
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 27 14:29:15 2024 +0800
Refactor InventoryDumper (#32702)
---
.../ingest/dumper/inventory/InventoryDumper.java | 35 ++++++++++------------
1 file changed, 16 insertions(+), 19 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 4f7872ff274..058bb96e8d9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -86,8 +86,6 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
private final AtomicReference<Statement> runningStatement = new
AtomicReference<>();
- private PipelineTableMetaData tableMetaData;
-
public InventoryDumper(final InventoryDumperContext dumperContext, final
PipelineChannel channel, final DataSource dataSource,
final PipelineTableMetaDataLoader metaDataLoader,
final InventoryDataRecordPositionCreator positionCreator) {
this.dumperContext = dumperContext;
@@ -107,12 +105,12 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
log.info("Ignored because of already finished.");
return;
}
- init();
+ PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
try (Connection connection = dataSource.getConnection()) {
if (Strings.isNullOrEmpty(dumperContext.getQuerySQL()) &&
dumperContext.hasUniqueKey() && !isPrimaryKeyWithoutRange(position)) {
- dumpPageByPage(connection);
+ dumpPageByPage(connection, tableMetaData);
} else {
- dumpWithStreamingQuery(connection);
+ dumpWithStreamingQuery(connection, tableMetaData);
}
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
@@ -122,12 +120,10 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private void init() {
- if (null == tableMetaData) {
- String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
- String tableName = dumperContext.getActualTableName();
- tableMetaData = metaDataLoader.getTableMetaData(schemaName,
tableName);
- }
+ private PipelineTableMetaData getPipelineTableMetaData() {
+ String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+ String tableName = dumperContext.getActualTableName();
+ return metaDataLoader.getTableMetaData(schemaName, tableName);
}
private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
@@ -135,7 +131,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
@SuppressWarnings("MagicConstant")
- private void dumpPageByPage(final Connection connection) throws
SQLException {
+ private void dumpPageByPage(final Connection connection, final
PipelineTableMetaData tableMetaData) throws SQLException {
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
@@ -145,10 +141,10 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
while (true) {
QueryRange queryRange = new
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(),
firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
InventoryQueryParameter queryParam =
InventoryQueryParameter.buildForRangeQuery(queryRange);
- List<Record> dataRecords = dumpPageByPage(connection, queryParam,
rowCount);
+ List<Record> dataRecords = dumpPageByPage(connection, queryParam,
rowCount, tableMetaData);
if (dataRecords.size() > 1 &&
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0),
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
queryParam =
InventoryQueryParameter.buildForPointQuery(getFirstUniqueKeyValue(dataRecords,
0));
- dataRecords = dumpPageByPage(connection, queryParam, rowCount);
+ dataRecords = dumpPageByPage(connection, queryParam, rowCount,
tableMetaData);
}
firstQuery = false;
if (dataRecords.isEmpty()) {
@@ -166,7 +162,8 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private List<Record> dumpPageByPage(final Connection connection, final
InventoryQueryParameter queryParam, final AtomicLong rowCount) throws
SQLException {
+ private List<Record> dumpPageByPage(final Connection connection,
+ final InventoryQueryParameter
queryParam, final AtomicLong rowCount, final PipelineTableMetaData
tableMetaData) throws SQLException {
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
int batchSize = dumperContext.getBatchSize();
try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpPageByPageSQL(queryParam), batchSize)) {
@@ -183,7 +180,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
result = new LinkedList<>();
}
- result.add(loadDataRecord(resultSet, resultSetMetaData));
+ result.add(loadDataRecord(resultSet, resultSetMetaData,
tableMetaData));
rowCount.incrementAndGet();
if (!isRunning()) {
log.info("Broke because of inventory dump is not
running.");
@@ -230,7 +227,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData) throws SQLException {
+ private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData)
throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
String tableName = dumperContext.getLogicTableName();
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
@@ -269,7 +266,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
@SuppressWarnings("MagicConstant")
- private void dumpWithStreamingQuery(final Connection connection) throws
SQLException {
+ private void dumpWithStreamingQuery(final Connection connection, final
PipelineTableMetaData tableMetaData) throws SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
@@ -290,7 +287,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
channel.push(dataRecords);
dataRecords = new LinkedList<>();
}
- dataRecords.add(loadDataRecord(resultSet,
resultSetMetaData));
+ dataRecords.add(loadDataRecord(resultSet,
resultSetMetaData, tableMetaData));
++rowCount;
if (!isRunning()) {
log.info("Broke because of inventory dump is not
running.");