This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d251517fec2 Extract PipelineDatabaseResources from CalculationContext
(#36756)
d251517fec2 is described below
commit d251517fec2f8b420916de6461406a5caf1edad9
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Sep 29 20:12:42 2025 +0800
Extract PipelineDatabaseResources from CalculationContext (#36756)
---
...stractRecordSingleTableInventoryCalculator.java | 24 +++---
.../table/calculator/CalculationContext.java | 95 +---------------------
.../query/PipelineDatabaseResources.java} | 44 +++-------
3 files changed, 30 insertions(+), 133 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
index e369662f172..35f2a6ae456 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/AbstractRecordSingleTableInventoryCalculator.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.DataConsist
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.PipelineDatabaseResources;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
@@ -105,7 +106,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
List<C> result = new LinkedList<>();
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
- ResultSet resultSet = calculationContext.getResultSet();
+ ResultSet resultSet =
calculationContext.getDatabaseResources().getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
@@ -119,7 +120,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
List<C> result = new LinkedList<>();
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
- ResultSet resultSet = calculationContext.getResultSet();
+ ResultSet resultSet =
calculationContext.getDatabaseResources().getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
@@ -139,7 +140,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
List<C> result = new LinkedList<>();
CalculationContext<C> calculationContext =
prepareCalculationContext(param);
prepareDatabaseResources(calculationContext, param);
- ResultSet resultSet = calculationContext.getResultSet();
+ ResultSet resultSet =
calculationContext.getDatabaseResources().getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () -> new
PipelineJobCancelingException("Calculate chunk canceled, qualified table: %s",
param.getTable()));
@@ -148,7 +149,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
return result;
}
}
- calculationContext.resetDatabaseResources();
+ calculationContext.getDatabaseResources().reset();
if (result.isEmpty() && 1 == round) {
return rangeQueryWithSingleColumUniqueKey(param,
columnValueReaderEngine, round + 1);
}
@@ -168,7 +169,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
private void doRangeQueryWithMultiColumUniqueKeys(final
SingleTableInventoryCalculateParameter param, final CalculationContext<C>
calculationContext,
final
InventoryColumnValueReaderEngine columnValueReaderEngine) throws SQLException {
prepareDatabaseResources(calculationContext, param);
- ResultSet resultSet = calculationContext.getResultSet();
+ ResultSet resultSet =
calculationContext.getDatabaseResources().getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
C previousRecord = calculationContext.getRecordDeque().pollLast();
List<C> duplicateRecords = new LinkedList<>();
@@ -197,7 +198,7 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
duplicateRecords.add(record);
}
- calculationContext.resetDatabaseResources();
+ calculationContext.getDatabaseResources().reset();
if (!duplicateRecords.isEmpty()) {
calculationContext.getRecordDeque().addAll(pointRangeQuery(param,
duplicateRecords.get(0), columnValueReaderEngine));
}
@@ -241,19 +242,20 @@ public abstract class
AbstractRecordSingleTableInventoryCalculator<S, C> extends
}
private void prepareDatabaseResources(final CalculationContext<C>
calculationContext, final SingleTableInventoryCalculateParameter param) throws
SQLException {
- if (calculationContext.isDatabaseResourcesReady()) {
+ if (calculationContext.getDatabaseResources().isReady()) {
return;
}
+ PipelineDatabaseResources databaseResources =
calculationContext.getDatabaseResources();
Connection connection = param.getDataSource().getConnection();
- calculationContext.setConnection(connection);
+ databaseResources.setConnection(connection);
String sql = getQuerySQL(param);
PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(param.getDatabaseType(), connection, sql,
chunkSize);
setCurrentStatement(preparedStatement);
- calculationContext.setPreparedStatement(preparedStatement);
+ databaseResources.setPreparedStatement(preparedStatement);
setParameters(preparedStatement, param);
ResultSet resultSet = preparedStatement.executeQuery();
- calculationContext.setResultSet(resultSet);
- calculationContext.setDatabaseResourcesReady(true);
+ databaseResources.setResultSet(resultSet);
+ databaseResources.setReady(true);
}
private String getQuerySQL(final SingleTableInventoryCalculateParameter
param) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
index 3d87c18f67f..096f58f821a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
@@ -18,113 +18,26 @@
package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
import lombok.Getter;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.PipelineDatabaseResources;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
import java.util.Deque;
import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Calculation context.
*
* @param <C> the type of record
*/
+@Getter
public final class CalculationContext<C> implements AutoCloseable {
- private final AtomicReference<Connection> connection = new
AtomicReference<>();
+ private final PipelineDatabaseResources databaseResources = new
PipelineDatabaseResources();
- private final AtomicReference<PreparedStatement> preparedStatement = new
AtomicReference<>();
-
- private final AtomicReference<ResultSet> resultSet = new
AtomicReference<>();
-
- private final AtomicBoolean databaseResourcesReady = new
AtomicBoolean(false);
-
- @Getter
private final Deque<C> recordDeque = new LinkedList<>();
- /**
- * Get connection.
- *
- * @return connection
- */
- public Connection getConnection() {
- return connection.get();
- }
-
- /**
- * Set connection.
- *
- * @param connection connection
- */
- public void setConnection(final Connection connection) {
- this.connection.set(connection);
- }
-
- /**
- * Set prepared statement.
- *
- * @param preparedStatement prepared statement
- */
- public void setPreparedStatement(final PreparedStatement
preparedStatement) {
- this.preparedStatement.set(preparedStatement);
- }
-
- /**
- * Get result set.
- *
- * @return result set
- */
- public ResultSet getResultSet() {
- return resultSet.get();
- }
-
- /**
- * Set result set.
- *
- * @param resultSet result set
- */
- public void setResultSet(final ResultSet resultSet) {
- this.resultSet.set(resultSet);
- }
-
- /**
- * Check if database resources are ready.
- *
- * @return true if database resources are ready, false otherwise
- */
- public boolean isDatabaseResourcesReady() {
- return databaseResourcesReady.get();
- }
-
- /**
- * Set database resources ready.
- *
- * @param databaseResourcesReady true if database resources are ready,
false otherwise
- */
- public void setDatabaseResourcesReady(final boolean
databaseResourcesReady) {
- this.databaseResourcesReady.set(databaseResourcesReady);
- }
-
@Override
public void close() {
- resetDatabaseResources();
+ databaseResources.close();
recordDeque.clear();
}
-
- /**
- * Reset database resources.
- */
- public void resetDatabaseResources() {
- setDatabaseResourcesReady(false);
- QuietlyCloser.close(resultSet.get());
- QuietlyCloser.close(preparedStatement.get());
- QuietlyCloser.close(connection.get());
- resultSet.set(null);
- preparedStatement.set(null);
- connection.set(null);
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/PipelineDatabaseResources.java
similarity index 70%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/PipelineDatabaseResources.java
index 3d87c18f67f..c782b646fea 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CalculationContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/query/PipelineDatabaseResources.java
@@ -15,25 +15,20 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query;
-import lombok.Getter;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.util.Deque;
-import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Calculation context.
- *
- * @param <C> the type of record
+ * Pipeline database resources.
*/
-public final class CalculationContext<C> implements AutoCloseable {
+public final class PipelineDatabaseResources implements AutoCloseable {
private final AtomicReference<Connection> connection = new
AtomicReference<>();
@@ -41,19 +36,7 @@ public final class CalculationContext<C> implements
AutoCloseable {
private final AtomicReference<ResultSet> resultSet = new
AtomicReference<>();
- private final AtomicBoolean databaseResourcesReady = new
AtomicBoolean(false);
-
- @Getter
- private final Deque<C> recordDeque = new LinkedList<>();
-
- /**
- * Get connection.
- *
- * @return connection
- */
- public Connection getConnection() {
- return connection.get();
- }
+ private final AtomicBoolean ready = new AtomicBoolean(false);
/**
* Set connection.
@@ -92,34 +75,33 @@ public final class CalculationContext<C> implements
AutoCloseable {
}
/**
- * Check if database resources are ready.
+ * Whether database resources are ready.
*
* @return true if database resources are ready, false otherwise
*/
- public boolean isDatabaseResourcesReady() {
- return databaseResourcesReady.get();
+ public boolean isReady() {
+ return ready.get();
}
/**
* Set database resources ready.
*
- * @param databaseResourcesReady true if database resources are ready,
false otherwise
+ * @param ready true if database resources are ready, false otherwise
*/
- public void setDatabaseResourcesReady(final boolean
databaseResourcesReady) {
- this.databaseResourcesReady.set(databaseResourcesReady);
+ public void setReady(final boolean ready) {
+ this.ready.set(ready);
}
@Override
public void close() {
- resetDatabaseResources();
- recordDeque.clear();
+ reset();
}
/**
* Reset database resources.
*/
- public void resetDatabaseResources() {
- setDatabaseResourcesReady(false);
+ public void reset() {
+ setReady(false);
QuietlyCloser.close(resultSet.get());
QuietlyCloser.close(preparedStatement.get());
QuietlyCloser.close(connection.get());