This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb2c9b4611 Refactor InventoryDumper (#32702)
ffb2c9b4611 is described below

commit ffb2c9b4611bc458e651691a577934ef4a41c5d0
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 27 14:29:15 2024 +0800

    Refactor InventoryDumper (#32702)
---
 .../ingest/dumper/inventory/InventoryDumper.java   | 35 ++++++++++------------
 1 file changed, 16 insertions(+), 19 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 4f7872ff274..058bb96e8d9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -86,8 +86,6 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     
     private final AtomicReference<Statement> runningStatement = new 
AtomicReference<>();
     
-    private PipelineTableMetaData tableMetaData;
-    
     public InventoryDumper(final InventoryDumperContext dumperContext, final 
PipelineChannel channel, final DataSource dataSource,
                            final PipelineTableMetaDataLoader metaDataLoader, 
final InventoryDataRecordPositionCreator positionCreator) {
         this.dumperContext = dumperContext;
@@ -107,12 +105,12 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
             log.info("Ignored because of already finished.");
             return;
         }
-        init();
+        PipelineTableMetaData tableMetaData = getPipelineTableMetaData();
         try (Connection connection = dataSource.getConnection()) {
             if (Strings.isNullOrEmpty(dumperContext.getQuerySQL()) && 
dumperContext.hasUniqueKey() && !isPrimaryKeyWithoutRange(position)) {
-                dumpPageByPage(connection);
+                dumpPageByPage(connection, tableMetaData);
             } else {
-                dumpWithStreamingQuery(connection);
+                dumpWithStreamingQuery(connection, tableMetaData);
             }
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
@@ -122,12 +120,10 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private void init() {
-        if (null == tableMetaData) {
-            String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
-            String tableName = dumperContext.getActualTableName();
-            tableMetaData = metaDataLoader.getTableMetaData(schemaName, 
tableName);
-        }
+    private PipelineTableMetaData getPipelineTableMetaData() {
+        String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
+        String tableName = dumperContext.getActualTableName();
+        return metaDataLoader.getTableMetaData(schemaName, tableName);
     }
     
     private boolean isPrimaryKeyWithoutRange(final IngestPosition position) {
@@ -135,7 +131,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     }
     
     @SuppressWarnings("MagicConstant")
-    private void dumpPageByPage(final Connection connection) throws 
SQLException {
+    private void dumpPageByPage(final Connection connection, final 
PipelineTableMetaData tableMetaData) throws SQLException {
         if (null != dumperContext.getTransactionIsolation()) {
             
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
         }
@@ -145,10 +141,10 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         while (true) {
             QueryRange queryRange = new 
QueryRange(((PrimaryKeyIngestPosition<?>) position).getBeginValue(), 
firstQuery, ((PrimaryKeyIngestPosition<?>) position).getEndValue());
             InventoryQueryParameter queryParam = 
InventoryQueryParameter.buildForRangeQuery(queryRange);
-            List<Record> dataRecords = dumpPageByPage(connection, queryParam, 
rowCount);
+            List<Record> dataRecords = dumpPageByPage(connection, queryParam, 
rowCount, tableMetaData);
             if (dataRecords.size() > 1 && 
Objects.deepEquals(getFirstUniqueKeyValue(dataRecords, 0), 
getFirstUniqueKeyValue(dataRecords, dataRecords.size() - 1))) {
                 queryParam = 
InventoryQueryParameter.buildForPointQuery(getFirstUniqueKeyValue(dataRecords, 
0));
-                dataRecords = dumpPageByPage(connection, queryParam, rowCount);
+                dataRecords = dumpPageByPage(connection, queryParam, rowCount, 
tableMetaData);
             }
             firstQuery = false;
             if (dataRecords.isEmpty()) {
@@ -166,7 +162,8 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private List<Record> dumpPageByPage(final Connection connection, final 
InventoryQueryParameter queryParam, final AtomicLong rowCount) throws 
SQLException {
+    private List<Record> dumpPageByPage(final Connection connection,
+                                        final InventoryQueryParameter 
queryParam, final AtomicLong rowCount, final PipelineTableMetaData 
tableMetaData) throws SQLException {
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         int batchSize = dumperContext.getBatchSize();
         try (PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(databaseType, connection, 
buildDumpPageByPageSQL(queryParam), batchSize)) {
@@ -183,7 +180,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                         }
                         result = new LinkedList<>();
                     }
-                    result.add(loadDataRecord(resultSet, resultSetMetaData));
+                    result.add(loadDataRecord(resultSet, resultSetMetaData, 
tableMetaData));
                     rowCount.incrementAndGet();
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not 
running.");
@@ -230,7 +227,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
         }
     }
     
-    private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData) throws SQLException {
+    private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData, final PipelineTableMetaData tableMetaData) 
throws SQLException {
         int columnCount = resultSetMetaData.getColumnCount();
         String tableName = dumperContext.getLogicTableName();
         DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
@@ -269,7 +266,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
     }
     
     @SuppressWarnings("MagicConstant")
-    private void dumpWithStreamingQuery(final Connection connection) throws 
SQLException {
+    private void dumpWithStreamingQuery(final Connection connection, final 
PipelineTableMetaData tableMetaData) throws SQLException {
         int batchSize = dumperContext.getBatchSize();
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         if (null != dumperContext.getTransactionIsolation()) {
@@ -290,7 +287,7 @@ public final class InventoryDumper extends 
AbstractPipelineLifecycleRunnable imp
                         channel.push(dataRecords);
                         dataRecords = new LinkedList<>();
                     }
-                    dataRecords.add(loadDataRecord(resultSet, 
resultSetMetaData));
+                    dataRecords.add(loadDataRecord(resultSet, 
resultSetMetaData, tableMetaData));
                     ++rowCount;
                     if (!isRunning()) {
                         log.info("Broke because of inventory dump is not 
running.");

Reply via email to