This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dca3ca578b4 Refactor InventoryDumper. (#33930)
dca3ca578b4 is described below

commit dca3ca578b4f660aaec866f4c9c03a00aa6d31de
Author: Cong Hu <[email protected]>
AuthorDate: Fri Dec 6 16:35:09 2024 +0800

    Refactor InventoryDumper. (#33930)
---
 .../ingest/dumper/inventory/InventoryDumper.java   | 146 +++++++++++----------
 1 file changed, 80 insertions(+), 66 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index daab070fcce..ff7a42fbb0c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
 
-import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
@@ -111,10 +111,10 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
         PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
         try (Connection connection = dataSource.getConnection()) {
-            if (Strings.isNullOrEmpty(dumperContext.getQuerySQL()) && 
dumperContext.hasUniqueKey() && !isPrimaryKeyWithoutRange(position)) {
-                dumpPageByPage(connection, tableMetaData);
-            } else {
+            if (StringUtils.isNotBlank(dumperContext.getQuerySQL()) || 
!dumperContext.hasUniqueKey() || isPrimaryKeyWithoutRange(position)) {
                 dumpWithStreamingQuery(connection, tableMetaData);
+            } else {
+                dumpByPage(connection, tableMetaData);
             }
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
@@ -135,20 +135,21 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     }
     
     @SuppressWarnings("MagicConstant")
-    private void dumpPageByPage(final Connection connection, final 
PipelineTableMetaData tableMetaData) throws SQLException {
+    private void dumpByPage(final Connection connection, final 
PipelineTableMetaData tableMetaData) throws SQLException {
+        log.info("Start to dump inventory data by page, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
         if (null != dumperContext.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
         }
         boolean firstQuery = true;
         AtomicLong rowCount = new AtomicLong();
         IngestPosition position = 
dumperContext.getCommonContext().getPosition();
-        while (true) {
+        do {
             QueryRange queryRange = new 
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), 
firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
             InventoryQueryParameter<?> queryParam = new 
InventoryRangeQueryParameter(queryRange);
-            List<Record> dataRecords = dumpPageByPage(connection, queryParam, 
rowCount, tableMetaData);
+            List<Record> dataRecords = dumpByPage(connection, queryParam, 
rowCount, tableMetaData);
             if (dataRecords.size() > 1 && 
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), 
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
                 queryParam = new 
InventoryPointQueryParameter(getFirstUniqueKeyValue(dataRecords, 0));
-                dataRecords = dumpPageByPage(connection, queryParam, rowCount, 
tableMetaData);
+                dataRecords = dumpByPage(connection, queryParam, rowCount, 
tableMetaData);
             }
             firstQuery = false;
             if (dataRecords.isEmpty()) {
@@ -160,19 +161,17 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             }
             channel.push(dataRecords);
             dumperContext.getCommonContext().setPosition(position);
-            if (position instanceof IngestFinishedPosition) {
-                break;
-            }
-        }
+        } while (!(position instanceof IngestFinishedPosition));
+        log.info("End to dump inventory data by page, dataSource={}, 
actualTable={}", dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
     }
     
-    private List<Record> dumpPageByPage(final Connection connection,
-                                        final InventoryQueryParameter<?> 
queryParam, final AtomicLong rowCount, final PipelineTableMetaData 
tableMetaData) throws SQLException {
+    private List<Record> dumpByPage(final Connection connection,
+                                    final InventoryQueryParameter<?> 
queryParam, final AtomicLong rowCount, final PipelineTableMetaData 
tableMetaData) throws SQLException {
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         int batchSize = dumperContext.getBatchSize();
-        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpPageByPageSQL(queryParam), batchSize)) {
+        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpByPageSQL(queryParam), batchSize)) {
             runningStatement.set(preparedStatement);
-            setParameters(preparedStatement, queryParam, false);
+            setParameters(preparedStatement, queryParam);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperContext.getRateLimitAlgorithm();
                 ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
@@ -201,31 +200,19 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private void setParameters(final PreparedStatement preparedStatement, 
final InventoryQueryParameter<?> queryParam, final boolean streamingQuery) 
throws SQLException {
-        if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
-            for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
-                preparedStatement.setObject(i + 1, 
dumperContext.getQueryParams().get(i));
-            }
-            return;
-        }
-        if (!dumperContext.hasUniqueKey()) {
-            return;
-        }
-        int parameterIndex = 1;
+    private void setParameters(final PreparedStatement preparedStatement, 
final InventoryQueryParameter<?> queryParam) throws SQLException {
         if (queryParam instanceof InventoryRangeQueryParameter) {
             Object lower = ((InventoryRangeQueryParameter) 
queryParam).getValue().getLower();
             if (null != lower) {
-                preparedStatement.setObject(parameterIndex++, lower);
+                preparedStatement.setObject(1, lower);
             }
             Object upper = ((InventoryRangeQueryParameter) 
queryParam).getValue().getUpper();
             if (null != upper) {
-                preparedStatement.setObject(parameterIndex++, upper);
-            }
-            if (!streamingQuery) {
-                preparedStatement.setInt(parameterIndex, 
dumperContext.getBatchSize());
+                preparedStatement.setObject(2, upper);
             }
+            preparedStatement.setInt(3, dumperContext.getBatchSize());
         } else if (queryParam instanceof InventoryPointQueryParameter) {
-            preparedStatement.setObject(parameterIndex, queryParam.getValue());
+            preparedStatement.setObject(1, queryParam.getValue());
         } else {
             throw new UnsupportedOperationException("Query type: " + 
queryParam.getValue());
         }
@@ -247,7 +234,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         return result;
     }
     
-    private String buildDumpPageByPageSQL(final InventoryQueryParameter<?> 
queryParam) {
+    private String buildDumpByPageSQL(final InventoryQueryParameter<?> 
queryParam) {
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         PipelineColumnMetaData firstColumn = 
dumperContext.getUniqueKeyColumns().get(0);
         List<String> columnNames = dumperContext.getQueryColumnNames();
@@ -276,45 +263,72 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         if (null != dumperContext.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
         }
-        try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpSQLWithStreamingQuery(), batchSize)) {
-            runningStatement.set(preparedStatement);
-            PrimaryKeyIngestPosition<?> primaryPosition = 
(PrimaryKeyIngestPosition<?>) dumperContext.getCommonContext().getPosition();
-            InventoryRangeQueryParameter queryParam = new 
InventoryRangeQueryParameter(new QueryRange(primaryPosition.getBeginValue(), 
true, primaryPosition.getEndValue()));
-            setParameters(preparedStatement, queryParam, true);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                int rowCount = 0;
-                JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperContext.getRateLimitAlgorithm();
-                ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
-                List<Record> dataRecords = new LinkedList<>();
-                while (resultSet.next()) {
-                    if (dataRecords.size() >= batchSize) {
-                        channel.push(dataRecords);
-                        dataRecords = new LinkedList<>();
-                    }
-                    dataRecords.add(loadDataRecord(resultSet, 
resultSetMetaData, tableMetaData));
-                    ++rowCount;
-                    if (!isRunning()) {
-                        log.info("Broke because of inventory dump is not 
running.");
-                        break;
-                    }
-                    if (null != rateLimitAlgorithm && 0 == rowCount % 
batchSize) {
-                        
rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 1);
-                    }
-                }
-                dataRecords.add(new FinishedRecord(new 
IngestFinishedPosition()));
-                channel.push(dataRecords);
-                log.info("Inventory dump with streaming query done, 
rowCount={}, dataSource={}, actualTable={}",
-                        rowCount, 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+        if (null == dumperContext.getQuerySQL()) {
+            fetchAllQuery(connection, tableMetaData, databaseType, batchSize);
+        } else {
+            designatedParametersQuery(connection, tableMetaData, databaseType, 
batchSize);
+        }
+    }
+    
+    private void fetchAllQuery(final Connection connection, final 
PipelineTableMetaData tableMetaData, final DatabaseType databaseType,
+                               final int batchSize) throws SQLException {
+        log.info("Start to fetch all inventory data with streaming query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+        try (PreparedStatement statement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildFetchAllSQLWithStreamingQuery(), batchSize)) {
+            runningStatement.set(statement);
+            try (ResultSet resultSet = statement.executeQuery()) {
+                consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
+            } finally {
+                runningStatement.set(null);
+            }
+        }
+        log.info("End to fetch all inventory data with streaming query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(), 
dumperContext.getActualTableName());
+    }
+    
+    private void designatedParametersQuery(final Connection connection, final 
PipelineTableMetaData tableMetaData, final DatabaseType databaseType, final int 
batchSize) throws SQLException {
+        log.info("Start to dump inventory data with designated parameters 
query, dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(),
+                dumperContext.getActualTableName());
+        try (PreparedStatement statement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
dumperContext.getQuerySQL(), batchSize)) {
+            runningStatement.set(statement);
+            for (int i = 0; i < dumperContext.getQueryParams().size(); i++) {
+                statement.setObject(i + 1, 
dumperContext.getQueryParams().get(i));
+            }
+            try (ResultSet resultSet = statement.executeQuery()) {
+                consumeResultSetToChannel(tableMetaData, resultSet, batchSize);
             } finally {
                 runningStatement.set(null);
             }
         }
+        log.info("End to dump inventory data with designated parameters query, 
dataSource={}, actualTable={}", 
dumperContext.getCommonContext().getDataSourceName(),
+                dumperContext.getActualTableName());
     }
     
-    private String buildDumpSQLWithStreamingQuery() {
-        if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
-            return dumperContext.getQuerySQL();
+    private void consumeResultSetToChannel(final PipelineTableMetaData 
tableMetaData, final ResultSet resultSet, final int batchSize) throws 
SQLException {
+        int rowCount = 0;
+        JobRateLimitAlgorithm rateLimitAlgorithm = 
dumperContext.getRateLimitAlgorithm();
+        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+        List<Record> dataRecords = new LinkedList<>();
+        while (resultSet.next()) {
+            if (dataRecords.size() >= batchSize) {
+                channel.push(dataRecords);
+                dataRecords = new LinkedList<>();
+            }
+            dataRecords.add(loadDataRecord(resultSet, resultSetMetaData, 
tableMetaData));
+            ++rowCount;
+            if (!isRunning()) {
+                log.info("Broke because of inventory dump is not running.");
+                break;
+            }
+            if (null != rateLimitAlgorithm && 0 == rowCount % batchSize) {
+                rateLimitAlgorithm.intercept(PipelineSQLOperationType.SELECT, 
1);
+            }
         }
+        dataRecords.add(new FinishedRecord(new IngestFinishedPosition()));
+        channel.push(dataRecords);
+        log.info("Inventory dump with streaming query done, rowCount={}, 
dataSource={}, actualTable={}", rowCount, 
dumperContext.getCommonContext().getDataSourceName(),
+                dumperContext.getActualTableName());
+    }
+    
+    private String buildFetchAllSQLWithStreamingQuery() {
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         List<String> columnNames = dumperContext.getQueryColumnNames();
         return sqlBuilder.buildFetchAllSQL(schemaName, 
dumperContext.getActualTableName(), columnNames);

Reply via email to