This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d251517fec2 Extract PipelineDatabaseResources from CalculationContext 
(#36756)
d251517fec2 is described below

commit d251517fec2f8b420916de6461406a5caf1edad9
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Sep 29 20:12:42 2025 +0800

    Extract PipelineDatabaseResources from CalculationContext (#36756)
---
 ...stractRecordSingleTableInventoryCalculator.java | 24 +++---
 .../table/calculator/CalculationContext.java       | 95 +---------------------
 .../query/PipelineDatabaseResources.java}          | 44 +++-------
 3 files changed, 30 insertions(+), 133 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
index e369662f172..35f2a6ae456 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.PipelineDatabaseResources;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
@@ -105,7 +106,7 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
         List<C> result = new LinkedList<>();
         CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         prepareDatabaseResources(calculationContext, param);
-        ResultSet resultSet = calculationContext.getResultSet();
+        ResultSet resultSet = 
calculationContext.getDatabaseResources().getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
@@ -119,7 +120,7 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
         List<C> result = new LinkedList<>();
         CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         prepareDatabaseResources(calculationContext, param);
-        ResultSet resultSet = calculationContext.getResultSet();
+        ResultSet resultSet = 
calculationContext.getDatabaseResources().getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
@@ -139,7 +140,7 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
         List<C> result = new LinkedList<>();
         CalculationContext<C> calculationContext = 
prepareCalculationContext(param);
         prepareDatabaseResources(calculationContext, param);
-        ResultSet resultSet = calculationContext.getResultSet();
+        ResultSet resultSet = 
calculationContext.getDatabaseResources().getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         while (resultSet.next()) {
             ShardingSpherePreconditions.checkState(!isCanceling(), () -> new 
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s", 
param.getTable()));
@@ -148,7 +149,7 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
                 return result;
             }
         }
-        calculationContext.resetDatabaseResources();
+        calculationContext.getDatabaseResources().reset();
         if (result.isEmpty() && 1 == round) {
             return rangeQueryWithSingleColumUniqueKey(param, 
columnValueReaderEngine, round + 1);
         }
@@ -168,7 +169,7 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
     private void doRangeQueryWithMultiColumUniqueKeys(final 
SingleTableInventoryCalculateParameter param, final CalculationContext<C> 
calculationContext,
                                                       final 
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
         prepareDatabaseResources(calculationContext, param);
-        ResultSet resultSet = calculationContext.getResultSet();
+        ResultSet resultSet = 
calculationContext.getDatabaseResources().getResultSet();
         ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
         C previousRecord = calculationContext.getRecordDeque().pollLast();
         List<C> duplicateRecords = new LinkedList<>();
@@ -197,7 +198,7 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
             }
             duplicateRecords.add(record);
         }
-        calculationContext.resetDatabaseResources();
+        calculationContext.getDatabaseResources().reset();
         if (!duplicateRecords.isEmpty()) {
             calculationContext.getRecordDeque().addAll(pointRangeQuery(param, 
duplicateRecords.get(0), columnValueReaderEngine));
         }
@@ -241,19 +242,20 @@ public abstract class 
AbstractRecordSingleTableInventoryCalculator<S, C> extends
     }
     
     private void prepareDatabaseResources(final CalculationContext<C> 
calculationContext, final SingleTableInventoryCalculateParameter param) throws 
SQLException {
-        if (calculationContext.isDatabaseResourcesReady()) {
+        if (calculationContext.getDatabaseResources().isReady()) {
             return;
         }
+        PipelineDatabaseResources databaseResources = 
calculationContext.getDatabaseResources();
         Connection connection = param.getDataSource().getConnection();
-        calculationContext.setConnection(connection);
+        databaseResources.setConnection(connection);
         String sql = getQuerySQL(param);
         PreparedStatement preparedStatement = 
JDBCStreamQueryBuilder.build(param.getDatabaseType(), connection, sql, 
chunkSize);
         setCurrentStatement(preparedStatement);
-        calculationContext.setPreparedStatement(preparedStatement);
+        databaseResources.setPreparedStatement(preparedStatement);
         setParameters(preparedStatement, param);
         ResultSet resultSet = preparedStatement.executeQuery();
-        calculationContext.setResultSet(resultSet);
-        calculationContext.setDatabaseResourcesReady(true);
+        databaseResources.setResultSet(resultSet);
+        databaseResources.setReady(true);
     }
     
     private String getQuerySQL(final SingleTableInventoryCalculateParameter 
param) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
index 3d87c18f67f..096f58f821a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
@@ -18,113 +18,26 @@
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
 
 import lombok.Getter;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.PipelineDatabaseResources;
 
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.util.Deque;
 import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Calculation context.
  *
  * @param <C> the type of record
  */
+@Getter
 public final class CalculationContext<C> implements AutoCloseable {
     
-    private final AtomicReference<Connection> connection = new 
AtomicReference<>();
+    private final PipelineDatabaseResources databaseResources = new 
PipelineDatabaseResources();
     
-    private final AtomicReference<PreparedStatement> preparedStatement = new 
AtomicReference<>();
-    
-    private final AtomicReference<ResultSet> resultSet = new 
AtomicReference<>();
-    
-    private final AtomicBoolean databaseResourcesReady = new 
AtomicBoolean(false);
-    
-    @Getter
     private final Deque<C> recordDeque = new LinkedList<>();
     
-    /**
-     * Get connection.
-     *
-     * @return connection
-     */
-    public Connection getConnection() {
-        return connection.get();
-    }
-    
-    /**
-     * Set connection.
-     *
-     * @param connection connection
-     */
-    public void setConnection(final Connection connection) {
-        this.connection.set(connection);
-    }
-    
-    /**
-     * Set prepared statement.
-     *
-     * @param preparedStatement prepared statement
-     */
-    public void setPreparedStatement(final PreparedStatement 
preparedStatement) {
-        this.preparedStatement.set(preparedStatement);
-    }
-    
-    /**
-     * Get result set.
-     *
-     * @return result set
-     */
-    public ResultSet getResultSet() {
-        return resultSet.get();
-    }
-    
-    /**
-     * Set result set.
-     *
-     * @param resultSet result set
-     */
-    public void setResultSet(final ResultSet resultSet) {
-        this.resultSet.set(resultSet);
-    }
-    
-    /**
-     * Check if database resources are ready.
-     *
-     * @return true if database resources are ready, false otherwise
-     */
-    public boolean isDatabaseResourcesReady() {
-        return databaseResourcesReady.get();
-    }
-    
-    /**
-     * Set database resources ready.
-     *
-     * @param databaseResourcesReady true if database resources are ready, 
false otherwise
-     */
-    public void setDatabaseResourcesReady(final boolean 
databaseResourcesReady) {
-        this.databaseResourcesReady.set(databaseResourcesReady);
-    }
-    
     @Override
     public void close() {
-        resetDatabaseResources();
+        databaseResources.close();
         recordDeque.clear();
     }
-    
-    /**
-     * Reset database resources.
-     */
-    public void resetDatabaseResources() {
-        setDatabaseResourcesReady(false);
-        QuietlyCloser.close(resultSet.get());
-        QuietlyCloser.close(preparedStatement.get());
-        QuietlyCloser.close(connection.get());
-        resultSet.set(null);
-        preparedStatement.set(null);
-        connection.set(null);
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/PipelineDatabaseResources.java
similarity index 70%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/PipelineDatabaseResources.java
index 3d87c18f67f..c782b646fea 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/PipelineDatabaseResources.java
@@ -15,25 +15,20 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query;
 
-import lombok.Getter;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.util.Deque;
-import java.util.LinkedList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Calculation context.
- *
- * @param <C> the type of record
+ * Pipeline database resources.
  */
-public final class CalculationContext<C> implements AutoCloseable {
+public final class PipelineDatabaseResources implements AutoCloseable {
     
     private final AtomicReference<Connection> connection = new 
AtomicReference<>();
     
@@ -41,19 +36,7 @@ public final class CalculationContext<C> implements 
AutoCloseable {
     
     private final AtomicReference<ResultSet> resultSet = new 
AtomicReference<>();
     
-    private final AtomicBoolean databaseResourcesReady = new 
AtomicBoolean(false);
-    
-    @Getter
-    private final Deque<C> recordDeque = new LinkedList<>();
-    
-    /**
-     * Get connection.
-     *
-     * @return connection
-     */
-    public Connection getConnection() {
-        return connection.get();
-    }
+    private final AtomicBoolean ready = new AtomicBoolean(false);
     
     /**
      * Set connection.
@@ -92,34 +75,33 @@ public final class CalculationContext<C> implements 
AutoCloseable {
     }
     
     /**
-     * Check if database resources are ready.
+     * Whether database resources are ready.
      *
      * @return true if database resources are ready, false otherwise
      */
-    public boolean isDatabaseResourcesReady() {
-        return databaseResourcesReady.get();
+    public boolean isReady() {
+        return ready.get();
     }
     
     /**
      * Set database resources ready.
      *
-     * @param databaseResourcesReady true if database resources are ready, 
false otherwise
+     * @param ready true if database resources are ready, false otherwise
      */
-    public void setDatabaseResourcesReady(final boolean 
databaseResourcesReady) {
-        this.databaseResourcesReady.set(databaseResourcesReady);
+    public void setReady(final boolean ready) {
+        this.ready.set(ready);
     }
     
     @Override
     public void close() {
-        resetDatabaseResources();
-        recordDeque.clear();
+        reset();
     }
     
     /**
      * Reset database resources.
      */
-    public void resetDatabaseResources() {
-        setDatabaseResourcesReady(false);
+    public void reset() {
+        setReady(false);
         QuietlyCloser.close(resultSet.get());
         QuietlyCloser.close(preparedStatement.get());
         QuietlyCloser.close(connection.get());

Reply via email to