This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 320308ac2b8 Merge PipelineDataSourceSink.runningStatement (#29525)
320308ac2b8 is described below
commit 320308ac2b88db993480278613d870c855c3ffa3
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 24 13:09:37 2023 +0800
Merge PipelineDataSourceSink.runningStatement (#29525)
---
.../importer/sink/type/PipelineDataSourceSink.java | 26 ++++++++--------------
.../core/ingest/dumper/InventoryDumper.java | 15 +++++++------
.../data/pipeline/core/util/PipelineJdbcUtils.java | 5 ++---
3 files changed, 19 insertions(+), 27 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index 0297d18edd8..f75363e4180 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -60,20 +60,14 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private final DataRecordGroupEngine groupEngine;
- private final AtomicReference<PreparedStatement>
runningBatchInsertStatement;
-
- private final AtomicReference<PreparedStatement> runningUpdateStatement;
-
- private final AtomicReference<PreparedStatement>
runningBatchDeleteStatement;
+ private final AtomicReference<PreparedStatement> runningStatement;
public PipelineDataSourceSink(final ImporterConfiguration importerConfig,
final PipelineDataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
dataSource =
dataSourceManager.getDataSource(importerConfig.getDataSourceConfig());
importSQLBuilder = new
PipelineImportSQLBuilder(importerConfig.getDataSourceConfig().getDatabaseType());
groupEngine = new DataRecordGroupEngine();
- runningBatchInsertStatement = new AtomicReference<>();
- runningUpdateStatement = new AtomicReference<>();
- runningBatchDeleteStatement = new AtomicReference<>();
+ runningStatement = new AtomicReference<>();
}
@Override
@@ -154,7 +148,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
DataRecord dataRecord = dataRecords.iterator().next();
String sql =
importSQLBuilder.buildInsertSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord);
try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- runningBatchInsertStatement.set(preparedStatement);
+ runningStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
for (int i = 0; i < each.getColumnCount(); i++) {
@@ -164,7 +158,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
preparedStatement.executeBatch();
} finally {
- runningBatchInsertStatement.set(null);
+ runningStatement.set(null);
}
}
@@ -180,7 +174,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
List<Column> setColumns =
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
String sql =
importSQLBuilder.buildUpdateSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord, conditionColumns);
try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- runningUpdateStatement.set(preparedStatement);
+ runningStatement.set(preparedStatement);
for (int i = 0; i < setColumns.size(); i++) {
preparedStatement.setObject(i + 1,
setColumns.get(i).getValue());
}
@@ -199,7 +193,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
log.warn("executeUpdate failed, updateCount={}, updateSql={},
updatedColumns={}, conditionColumns={}", updateCount, sql, setColumns,
conditionColumns);
}
} finally {
- runningUpdateStatement.set(null);
+ runningStatement.set(null);
}
}
@@ -208,7 +202,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
String sql =
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord,
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- runningBatchDeleteStatement.set(preparedStatement);
+ runningStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(each,
importerConfig.getShardingColumns(dataRecord.getTableName()));
@@ -223,14 +217,12 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
preparedStatement.executeBatch();
} finally {
- runningBatchDeleteStatement.set(null);
+ runningStatement.set(null);
}
}
@Override
public void close() {
- PipelineJdbcUtils.cancelStatement(runningBatchInsertStatement.get());
- PipelineJdbcUtils.cancelStatement(runningUpdateStatement.get());
- PipelineJdbcUtils.cancelStatement(runningBatchDeleteStatement.get());
+
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 5b4e4b68ca5..6366244928e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -74,22 +74,22 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
private final DataSource dataSource;
+ private final PipelineTableMetaDataLoader metaDataLoader;
+
private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
private final ColumnValueReaderEngine columnValueReaderEngine;
- private final PipelineTableMetaDataLoader metaDataLoader;
-
- private final AtomicReference<Statement> dumpStatement = new
AtomicReference<>();
+ private final AtomicReference<Statement> runningStatement = new
AtomicReference<>();
public InventoryDumper(final InventoryDumperContext dumperContext, final
PipelineChannel channel, final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader) {
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
+ this.metaDataLoader = metaDataLoader;
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
inventoryDumpSQLBuilder = new
PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new ColumnValueReaderEngine(databaseType);
- this.metaDataLoader = metaDataLoader;
}
@Override
@@ -117,7 +117,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
connection.setTransactionIsolation(dumperContext.getTransactionIsolation());
}
try (PreparedStatement preparedStatement =
JDBCStreamQueryBuilder.build(databaseType, connection,
buildInventoryDumpSQL())) {
- dumpStatement.set(preparedStatement);
+ runningStatement.set(preparedStatement);
if (!(databaseType instanceof MySQLDatabaseType)) {
preparedStatement.setFetchSize(batchSize);
}
@@ -144,8 +144,9 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
}
dataRecords.add(new FinishedRecord(new
IngestFinishedPosition()));
channel.pushRecords(dataRecords);
- dumpStatement.set(null);
log.info("Inventory dump done, rowCount={}, dataSource={},
actualTable={}", rowCount,
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
+ } finally {
+ runningStatement.set(null);
}
}
}
@@ -217,6 +218,6 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
@Override
protected void doStop() {
- PipelineJdbcUtils.cancelStatement(dumpStatement.get());
+
Optional.ofNullable(runningStatement.get()).ifPresent(PipelineJdbcUtils::cancelStatement);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
index 43290b84894..4ffd6621cf0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJdbcUtils.java
@@ -87,10 +87,9 @@ public final class PipelineJdbcUtils {
*/
public static void cancelStatement(final Statement statement) throws
SQLWrapperException {
try {
- if (null == statement || statement.isClosed()) {
- return;
+ if (!statement.isClosed()) {
+ statement.cancel();
}
- statement.cancel();
} catch (final SQLException ex) {
throw new SQLWrapperException(ex);
}