This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d0cf0518dc3 Simplify InventoryDumper usage of PipelineTableMetaData 
(#36636)
d0cf0518dc3 is described below

commit d0cf0518dc3fe87c3b76f265642fc61f35f4c7d0
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Sep 19 20:07:31 2025 +0800

    Simplify InventoryDumper usage of PipelineTableMetaData (#36636)
---
 .../ingest/dumper/inventory/InventoryDumper.java   | 58 ++++++++++------------
 .../dumper/inventory/InventoryDumperContext.java   | 16 ++++++
 2 files changed, 42 insertions(+), 32 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index dbf5f0b4250..18e1bbc7f57 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
@@ -43,7 +43,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.BuildDivisibleSQLParameter;
@@ -52,6 +51,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -110,12 +110,11 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             log.info("Ignored because of already finished.");
             return;
         }
-        PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
         try (Connection connection = dataSource.getConnection()) {
-            if (StringUtils.isNotBlank(dumperContext.getQuerySQL()) || 
!dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
-                dumpWithStreamingQuery(connection, tableMetaData);
+            if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL()) || 
!dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
+                dumpWithStreamingQuery(connection);
             } else {
-                dumpByPage(connection, tableMetaData);
+                dumpByPage(connection);
             }
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
@@ -125,18 +124,12 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private PipelineTableMetaData getPipelineTableMetaData() {
-        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
-        String tableName = dumperContext.getActualTableName();
-        return metaDataLoader.getTableMetaData(schemaName, tableName);
-    }
-    
     private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
         return position instanceof PrimaryKeyIngestPosition && null == 
((PrimaryKeyIngestPosition<?>) position).getBeginValue() && null == 
((PrimaryKeyIngestPosition<?>) position).getEndValue();
     }
     
     @SuppressWarnings("MagicConstant")
-    private void dumpByPage(final Connection connection, final 
PipelineTableMetaData tableMetaData) throws SQLException {
+    private void dumpByPage(final Connection connection) throws SQLException {
         log.info("Start to dump inventory data by page, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
         if (null != dumperContext.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
@@ -148,10 +141,10 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             QueryRange queryRange = new 
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), firstQuery 
&& dumperContext.isFirstDump(),
                     ((PrimaryKeyIngestPosition<?>) position).getEndValue());
             InventoryQueryParameter<?> queryParam = new 
InventoryRangeQueryParameter(queryRange);
-            List<Record> dataRecords = dumpByPage(connection, queryParam, 
rowCount, tableMetaData);
+            List<Record> dataRecords = dumpByPage(connection, queryParam, 
rowCount);
             if (dataRecords.size() > 1 && 
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), 
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
                 queryParam = new 
InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
-                dataRecords = dumpByPage(connection, queryParam, rowCount, 
tableMetaData);
+                dataRecords = dumpByPage(connection, queryParam, rowCount);
             }
             firstQuery = false;
             if (dataRecords.isEmpty()) {
@@ -167,8 +160,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         log.info("End to dump inventory data by page, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
     }
     
-    private List<Record> dumpByPage(final Connection connection,
-                                    final InventoryQueryParameter<?> 
queryParam, final AtomicLong rowCount, final PipelineTableMetaData 
tableMetaData) throws SQLException {
+    private List<Record> dumpByPage(final Connection connection, final 
InventoryQueryParameter<?> queryParam, final AtomicLong rowCount) throws 
SQLException {
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         int batchSize = dumperContext.getBatchSize();
         try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpByPageSQL(queryParam), batchSize)) {
@@ -185,7 +177,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                         }
                         result = new LinkedList<>();
                     }
-                    result.add(loadDataRecord(resultSet, resultSetMetaData, 
tableMetaData));
+                    result.add(loadDataRecord(resultSet, resultSetMetaData));
                     rowCount.incrementAndGet();
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not 
running.");
@@ -221,7 +213,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) 
throws SQLException {
+    private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData) throws SQLException {
         int columnCount = resultSetMetaData.getColumnCount();
         String tableName = dumperContext.getLogicTableName();
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
@@ -230,14 +222,17 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 () -> new PipelineInvalidParameterException("Insert column 
names count not equals ResultSet column count"));
         for (int i = 1; i <= columnCount; i++) {
             String columnName = insertColumnNames.isEmpty() ? 
resultSetMetaData.getColumnName(i) : insertColumnNames.get(i - 1);
-            
ShardingSpherePreconditions.checkNotNull(tableMetaData.getColumnMetaData(columnName),
 () -> new PipelineInvalidParameterException(String.format("Column name is %s", 
columnName)));
-            Column column = new NormalColumn(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, i), true, 
tableMetaData.getColumnMetaData(columnName).isUniqueKey());
+            Column column = getColumn(resultSet, resultSetMetaData, 
columnName, i, dumperContext.getTargetUniqueKeysNames().contains(new 
ShardingSphereIdentifier(columnName)));
             result.addColumn(column);
         }
         result.setActualTableName(dumperContext.getActualTableName());
         return result;
     }
     
+    private Column getColumn(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final String columnName, final int 
columnIndex, final boolean isUniqueKey) throws SQLException {
+        return new NormalColumn(columnName, 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex), true, 
isUniqueKey);
+    }
+    
     private String buildDumpByPageSQL(final InventoryQueryParameter<?> 
queryParam) {
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         PipelineColumnMetaData firstColumn = 
dumperContext.getUniqueKeyColumns().get(0);
@@ -261,26 +256,25 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     }
     
     @SuppressWarnings("MagicConstant")
-    private void dumpWithStreamingQuery(final Connection connection, final 
PipelineTableMetaData tableMetaData) throws SQLException {
+    private void dumpWithStreamingQuery(final Connection connection) throws 
SQLException {
         int batchSize = dumperContext.getBatchSize();
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         if (null != dumperContext.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
         }
         if (null == dumperContext.getQuerySQL()) {
-            fetchAllQuery(connection, tableMetaData, databaseType, batchSize);
+            fetchAllQuery(connection, databaseType, batchSize);
         } else {
-            designatedParametersQuery(connection, tableMetaData, databaseType, 
batchSize);
+            designatedParametersQuery(connection, databaseType, batchSize);
         }
     }
     
-    private void fetchAllQuery(final Connection connection, final 
PipelineTableMetaData tableMetaData, final DatabaseType databaseType,
-                               final int batchSize) throws SQLException {
+    private void fetchAllQuery(final Connection connection, final DatabaseType 
databaseType, final int batchSize) throws SQLException {
         log.info("Start to fetch all inventory data with streaming query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
         try (PreparedStatement statement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildFetchAllSQLWithStreamingQuery(), batchSize)) {
             runningStatement.set(statement);
             try (ResultSet resultSet = statement.executeQuery()) {
-                consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
+                consumeResultSetToChannel(resultSet, batchSize);
             } finally {
                 runningStatement.set(null);
             }
@@ -288,7 +282,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         log.info("End to fetch all inventory data with streaming query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
     }
     
-    private void designatedParametersQuery(final Connection connection, final 
PipelineTableMetaData tableMetaData, final DatabaseType databaseType, final int 
batchSize) throws SQLException {
+    private void designatedParametersQuery(final Connection connection, final 
DatabaseType databaseType, final int batchSize) throws SQLException {
         log.info("Start to dump inventory data with designated parameters 
query, dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(),
                 dumperContext.getActualTableName());
         try (PreparedStatement statement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
dumperContext.getQuerySQL(), batchSize)) {
@@ -297,7 +291,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 statement.setObject(i + 1, 
dumperContext.getQueryParams().get(i));
             }
             try (ResultSet resultSet = statement.executeQuery()) {
-                consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
+                consumeResultSetToChannel(resultSet, batchSize);
             } finally {
                 runningStatement.set(null);
             }
@@ -306,8 +300,8 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 dumperContext.getActualTableName());
     }
     
-    private void consumeResultSetToChannel(final PipelineTableMetaData 
tableMetaData, final ResultSet resultSet, final int batchSize) throws 
SQLException {
-        int rowCount = 0;
+    private void consumeResultSetToChannel(final ResultSet resultSet, final 
int batchSize) throws SQLException {
+        long rowCount = 0;
         JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperContext.getRateLimitAlgorithm();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         List<Record> dataRecords = new LinkedList<>();
@@ -316,7 +310,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                 channel.push(dataRecords);
                 dataRecords = new LinkedList<>();
             }
-            dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, 
tableMetaData));
+            dataRecords.add(loadDataRecord(resultSet, resultSetMetaData));
             ++rowCount;
             if (!isRunning()) {
                 log.info("Broke because of inventory dump is not running.");
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
index 2052b2c005e..29cb11286ad 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumperContext.java
@@ -23,10 +23,12 @@ import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 /**
  * Inventory dumper context.
@@ -44,6 +46,8 @@ public final class InventoryDumperContext {
     
     private List<PipelineColumnMetaData> uniqueKeyColumns;
     
+    private List<ShardingSphereIdentifier> targetUniqueKeysNames;
+    
     private List<String> insertColumnNames;
     
     private String querySQL;
@@ -65,6 +69,18 @@ public final class InventoryDumperContext {
                 commonContext.getDataSourceName(), 
commonContext.getDataSourceConfig(), commonContext.getTableNameMapper(), 
commonContext.getTableAndSchemaNameMapper());
     }
     
+    /**
+     * Set unique key columns.
+     *
+     * @param uniqueKeyColumns unique key columns
+     */
+    public void setUniqueKeyColumns(final List<PipelineColumnMetaData> 
uniqueKeyColumns) {
+        this.uniqueKeyColumns = uniqueKeyColumns;
+        targetUniqueKeysNames = hasUniqueKey()
+                ? uniqueKeyColumns.stream().map(each -> new 
ShardingSphereIdentifier(each.getName())).collect(Collectors.toList())
+                : Collections.emptyList();
+    }
+    
     /**
      * Has unique key or not.
      *

Reply via email to