This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d0cf0518dc3 Simplify InventoryDumper usage of PipelineTableMetaData
(#36636)
d0cf0518dc3 is described below
commit d0cf0518dc3fe87c3b76f265642fc61f35f4c7d0
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 19 20:07:31 2025 +0800
Simplify InventoryDumper usage of PipelineTableMetaData (#36636)
---
.../ingest/dumper/inventory/InventoryDumper.java | 58 ++++++++++------------
.../dumper/inventory/InventoryDumperContext.java | 16 ++++++
2 files changed, 42 insertions(+), 32 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index dbf5f0b4250..18e1bbc7f57 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
@@ -43,7 +43,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.BuildDivisibleSQLParameter;
@@ -52,6 +51,7 @@ import
org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -110,12 +110,11 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
log.info("Ignored because of already finished.");
return;
}
- PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
try (Connection connection = dataSource.getConnection()) {
- if (StringUtils.isNotBlank(dumperContext.getQuerySQL()) ||
!dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
- dumpWithStreamingQuery(connection, tableMetaData);
+ if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL()) ||
!dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
+ dumpWithStreamingQuery(connection);
} else {
- dumpByPage(connection, tableMetaData);
+ dumpByPage(connection);
}
// CHECKSTYLE:OFF
} catch (final SQLException | RuntimeException ex) {
@@ -125,18 +124,12 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private PipelineTableMetaData getPipelineTableMetaData() {
- String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
- String tableName = dumperContext.getActualTableName();
- return metaDataLoader.getTableMetaData(schemaName, tableName);
- }
-
private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
return position instanceof PrimaryKeyIngestPosition && null ==
((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null ==
((PrimaryKeyIngestPosition<?>) position).getEndValue();
}
@SuppressWarnings("MagicConstant")
- private void dumpByPage(final Connection connection, final
PipelineTableMetaData tableMetaData) throws SQLException {
+ private void dumpByPage(final Connection connection) throws SQLException {
log.info("Start to dump inventory data by page, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
@@ -148,10 +141,10 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
QueryRange queryRange = new
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery
&& dumperContext.isFirstDump(),
((PrimaryKeyIngestPosition<?>) position).getEndValue());
InventoryQueryParameter<?> queryParam = new
InventoryRangeQueryParameter(queryRange);
- List<Record> dataRecords = dumpByPage(connection, queryParam,
rowCount, tableMetaData);
+ List<Record> dataRecords = dumpByPage(connection, queryParam,
rowCount);
if (dataRecords.size() > 1 &&
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0),
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
queryParam = new
InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
- dataRecords = dumpByPage(connection, queryParam, rowCount,
tableMetaData);
+ dataRecords = dumpByPage(connection, queryParam, rowCount);
}
firstQuery = false;
if (dataRecords.isEmpty()) {
@@ -167,8 +160,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
log.info("End to dump inventory data by page, dataSource={},
actualTable={}", dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
}
- private List<Record> dumpByPage(final Connection connection,
- final InventoryQueryParameter<?>
queryParam, final AtomicLong rowCount, final PipelineTableMetaData
tableMetaData) throws SQLException {
+ private List<Record> dumpByPage(final Connection connection, final
InventoryQueryParameter<?> queryParam, final AtomicLong rowCount) throws
SQLException {
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
int batchSize = dumperContext.getBatchSize();
try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildDumpByPageSQL(queryParam), batchSize)) {
@@ -185,7 +177,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
result = new LinkedList<>();
}
- result.add(loadDataRecord(resultSet, resultSetMetaData,
tableMetaData));
+ result.add(loadDataRecord(resultSet, resultSetMetaData));
rowCount.incrementAndGet();
if (!isRunning()) {
log.info("Broke because of inventory dump is not
running.");
@@ -221,7 +213,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
}
- private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData)
throws SQLException {
+ private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData) throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
String tableName = dumperContext.getLogicTableName();
DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
@@ -230,14 +222,17 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
() -> new PipelineInvalidParameterException("Insert column
names count not equals ResultSet column count"));
for (int i = 1; i <= columnCount; i++) {
String columnName = insertColumnNames.isEmpty() ?
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
-
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
() -> new PipelineInvalidParameterException(String.format("Column name is %s",
columnName)));
- Column column = new NormalColumn(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true,
tableMetaData.getColumnMetaData(columnName).isUniqueKey());
+ Column column = getColumn(resultSet, resultSetMetaData,
columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new
ShardingSphereIdentifier(columnName)));
result.addColumn(column);
}
result.setActualTableName(dumperContext.getActualTableName());
return result;
}
+ private Column getColumn(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData, final String columnName, final int
columnIndex, final boolean isUniqueKey) throws SQLException {
+ return new NormalColumn(columnName,
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true,
isUniqueKey);
+ }
+
private String buildDumpByPageSQL(final InventoryQueryParameter<?>
queryParam) {
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
@@ -261,26 +256,25 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
@SuppressWarnings("MagicConstant")
- private void dumpWithStreamingQuery(final Connection connection, final
PipelineTableMetaData tableMetaData) throws SQLException {
+ private void dumpWithStreamingQuery(final Connection connection) throws
SQLException {
int batchSize = dumperContext.getBatchSize();
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
if (null != dumperContext.getTransactionIsolation()) {
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
if (null == dumperContext.getQuerySQL()) {
- fetchAllQuery(connection, tableMetaData, databaseType, batchSize);
+ fetchAllQuery(connection, databaseType, batchSize);
} else {
- designatedParametersQuery(connection, tableMetaData, databaseType,
batchSize);
+ designatedParametersQuery(connection, databaseType, batchSize);
}
}
- private void fetchAllQuery(final Connection connection, final
PipelineTableMetaData tableMetaData, final DatabaseType databaseType,
- final int batchSize) throws SQLException {
+ private void fetchAllQuery(final Connection connection, final DatabaseType
databaseType, final int batchSize) throws SQLException {
log.info("Start to fetch all inventory data with streaming query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
try (PreparedStatement statement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildFetchAllSQLWithStreamingQuery(), batchSize)) {
runningStatement.set(statement);
try (ResultSet resultSet = statement.executeQuery()) {
- consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
+ consumeResultSetToChannel(resultSet, batchSize);
} finally {
runningStatement.set(null);
}
@@ -288,7 +282,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
log.info("End to fetch all inventory data with streaming query,
dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
}
- private void designatedParametersQuery(final Connection connection, final
PipelineTableMetaData tableMetaData, final DatabaseType databaseType, final int
batchSize) throws SQLException {
+ private void designatedParametersQuery(final Connection connection, final
DatabaseType databaseType, final int batchSize) throws SQLException {
log.info("Start to dump inventory data with designated parameters
query, dataSource={}, actualTable={}",
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
try (PreparedStatement statement =
JDBCStreamQueryBuilder.build(databaseType, connection,
dumperContext.getQuerySQL(), batchSize)) {
@@ -297,7 +291,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
statement.setObject(i + 1,
dumperContext.getQueryParams().get(i));
}
try (ResultSet resultSet = statement.executeQuery()) {
- consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
+ consumeResultSetToChannel(resultSet, batchSize);
} finally {
runningStatement.set(null);
}
@@ -306,8 +300,8 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
dumperContext.getActualTableName());
}
- private void consumeResultSetToChannel(final PipelineTableMetaData
tableMetaData, final ResultSet resultSet, final int batchSize) throws
SQLException {
- int rowCount = 0;
+ private void consumeResultSetToChannel(final ResultSet resultSet, final
int batchSize) throws SQLException {
+ long rowCount = 0;
JobRateLimitAlgorithm rateLimitAlgorithm =
dumperContext.getRateLimitAlgorithm();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
List<Record> dataRecords = new LinkedList<>();
@@ -316,7 +310,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
channel.push(dataRecords);
dataRecords = new LinkedList<>();
}
- dataRecords.add(loadDataRecord(resultSet, resultSetMetaData,
tableMetaData));
+ dataRecords.add(loadDataRecord(resultSet, resultSetMetaData));
++rowCount;
if (!isRunning()) {
log.info("Broke because of inventory dump is not running.");
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
index 2052b2c005e..29cb11286ad 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
@@ -23,10 +23,12 @@ import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
/**
* Inventory dumper context.
@@ -44,6 +46,8 @@ public final class InventoryDumperContext {
private List<PipelineColumnMetaData> uniqueKeyColumns;
+ private List<ShardingSphereIdentifier> targetUniqueKeysNames;
+
private List<String> insertColumnNames;
private String querySQL;
@@ -65,6 +69,18 @@ public final class InventoryDumperContext {
commonContext.getDataSourceName(),
commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(),
commonContext.getTableAndSchemaNameMapper());
}
+ /**
+ * Set unique key columns.
+ *
+ * @param uniqueKeyColumns unique key columns
+ */
+ public void setUniqueKeyColumns(final List<PipelineColumnMetaData>
uniqueKeyColumns) {
+ this.uniqueKeyColumns = uniqueKeyColumns;
+ targetUniqueKeysNames = hasUniqueKey()
+ ? uniqueKeyColumns.stream().map(each -> new
ShardingSphereIdentifier(each.getName())).collect(Collectors.toList())
+ : Collections.emptyList();
+ }
+
/**
* Has unique key or not.
*