This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb5e3ae544 Review and improve pipeline code (#30351)
4eb5e3ae544 is described below
commit 4eb5e3ae5447b39b1824978345a06ff3ba8b9d9e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Feb 29 20:06:13 2024 +0800
Review and improve pipeline code (#30351)
* Format code
* Column name case insensitive in PipelineTableMetaData
* Improve pipeline job start event dispatch on proxy starting
* Refactor PipelineDataSourceSink.executeBatchDelete
---
.../core/checker/DataSourceCheckEngine.java | 4 +--
.../table/MatchingTableInventoryChecker.java | 2 +-
.../RecordSingleTableInventoryCalculator.java | 2 +-
.../importer/sink/type/PipelineDataSourceSink.java | 26 +++++++++++--------
.../PipelineContextManagerLifecycleListener.java | 30 ++++++++++++++++------
.../StandardPipelineTableMetaDataLoader.java | 3 ++-
.../core/metadata/model/PipelineTableMetaData.java | 7 ++---
....java => UnsupportedKeyIngestPositionTest.java} | 2 +-
.../metadata/model/PipelineTableMetaDataTest.java | 3 ++-
.../mysql/ingest/MySQLIncrementalDumperTest.java | 4 +--
.../ingest/wal/WALEventConverterTest.java | 4 +--
11 files changed, 54 insertions(+), 33 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 39c5858bdc8..29ed6c7be20 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -20,11 +20,11 @@ package
org.apache.shardingsphere.data.pipeline.core.checker;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -65,7 +65,7 @@ public final class DataSourceCheckEngine {
/**
* Check source data source.
- *
+ *
* @param dataSources to be checked source data source
*/
public void checkSourceDataSources(final Collection<DataSource>
dataSources) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 6e799b958da..abd1e1ecc04 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -47,8 +47,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* Matching table inventory checker.
*/
-@Slf4j
@RequiredArgsConstructor
+@Slf4j
public abstract class MatchingTableInventoryChecker implements
TableInventoryChecker {
private final TableInventoryCheckParameter param;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 9efea4fc839..a921d05b1e4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -128,7 +128,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
private String getQuerySQL(final SingleTableInventoryCalculateParameter
param) {
if (null == param.getFirstUniqueKey()) {
- throw new UnsupportedOperationException("Data consistency of
DATA_MATCH type not support table without unique key and primary key now");
+ throw new UnsupportedOperationException("Record inventory
calculator does not support table without unique key and primary key now");
}
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new
PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
Collection<String> columnNames = param.getColumnNames().isEmpty() ?
Collections.singleton("*") : param.getColumnNames();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index 8a38bba4e5d..d2c2b5c58b5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -217,20 +217,27 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
private void executeBatchDelete(final Collection<DataRecord> dataRecords)
throws SQLException {
- DataRecord dataRecord = dataRecords.iterator().next();
- String sql =
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord,
- RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
- try (
- Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- runningStatement.set(preparedStatement);
+ try (Connection connection = dataSource.getConnection()) {
boolean transactionEnabled = dataRecords.size() > 1;
if (transactionEnabled) {
connection.setAutoCommit(false);
}
+ executeBatchDelete(connection, dataRecords,
importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
+ if (transactionEnabled) {
+ connection.commit();
+ }
+ }
+ }
+
+ private void executeBatchDelete(final Connection connection, final
Collection<DataRecord> dataRecords, final Set<String> shardingColumns) throws
SQLException {
+ DataRecord dataRecord = dataRecords.iterator().next();
+ String deleteSQL =
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord,
+ RecordUtils.extractConditionColumns(dataRecord,
shardingColumns));
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
+ runningStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
- List<Column> conditionColumns =
RecordUtils.extractConditionColumns(each,
importerConfig.getShardingColumns(dataRecord.getTableName()));
+ List<Column> conditionColumns =
RecordUtils.extractConditionColumns(each,
importerConfig.getShardingColumns(each.getTableName()));
for (int i = 0; i < conditionColumns.size(); i++) {
Object oldValue = conditionColumns.get(i).getOldValue();
if (null == oldValue) {
@@ -241,9 +248,6 @@ public final class PipelineDataSourceSink implements
PipelineSink {
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
- if (transactionEnabled) {
- connection.commit();
- }
} finally {
runningStatement.set(null);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index a7c389d880b..d23d3707fb2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -21,11 +21,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
@@ -74,17 +75,30 @@ public final class PipelineContextManagerLifecycleListener
implements ContextMan
JobConfigurationAPI jobConfigAPI =
PipelineAPIFactory.getJobConfigurationAPI(contextKey);
List<JobBriefInfo> allJobsBriefInfo =
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
.stream().filter(each ->
!each.getJobName().startsWith("_")).collect(Collectors.toList());
+ log.info("All job names: {}",
allJobsBriefInfo.stream().map(JobBriefInfo::getJobName).collect(Collectors.joining(",")));
for (JobBriefInfo each : allJobsBriefInfo) {
- PipelineJobType jobType =
PipelineJobIdUtils.parseJobType(each.getJobName());
- PipelineJobInfo jobInfo = jobType.getJobInfo(each.getJobName());
- if (null == jobInfo || null == jobInfo.getJobMetaData()) {
+ PipelineJobType jobType;
+ try {
+ jobType = PipelineJobIdUtils.parseJobType(each.getJobName());
+ } catch (final IllegalArgumentException ex) {
+ log.warn("Parse job type failed, job name: {}, error: {}",
each.getJobName(), ex.getMessage());
continue;
}
- if (!jobInfo.getJobMetaData().isActive()) {
- return;
+ if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
+ continue;
+ }
+ JobConfigurationPOJO jobConfig;
+ try {
+ jobConfig =
jobConfigAPI.getJobConfiguration(each.getJobName());
+ } catch (final PipelineJobNotFoundException ex) {
+ log.error("Get job configuration failed, job name: {}, error:
{}", each.getJobName(), ex.getMessage());
+ continue;
+ }
+ if (jobConfig.isDisabled()) {
+ continue;
}
- JobConfigurationPOJO jobConfig =
jobConfigAPI.getJobConfiguration(each.getJobName());
- jobConfigAPI.updateJobConfiguration(jobConfig);
+ new PipelineJobManager(jobType).resume(each.getJobName());
+ log.info("Dispatch enable pipeline job start event, job name: {}",
each.getJobName());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 38c56562af3..11a21830329 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -112,7 +112,8 @@ public final class StandardPipelineTableMetaDataLoader
implements PipelineTableM
}
Collection<PipelineIndexMetaData> uniqueIndexMetaData =
uniqueKeys.entrySet().stream()
.map(entry -> new PipelineIndexMetaData(entry.getKey(),
entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
- result.put(new CaseInsensitiveIdentifier(each), new
PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
+ result.put(new CaseInsensitiveIdentifier(each), new
PipelineTableMetaData(each,
+
columnMetaDataMap.entrySet().stream().collect(Collectors.toMap(entry -> new
CaseInsensitiveIdentifier(entry.getKey()), Entry::getValue)),
uniqueIndexMetaData));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index c87ab899efe..9f2a9810db5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -24,6 +24,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import java.util.ArrayList;
import java.util.Collection;
@@ -44,7 +45,7 @@ public final class PipelineTableMetaData {
@NonNull
private final String name;
- private final Map<String, PipelineColumnMetaData> columnMetaDataMap;
+ private final Map<CaseInsensitiveIdentifier, PipelineColumnMetaData>
columnMetaDataMap;
@Getter
private final List<String> columnNames;
@@ -55,7 +56,7 @@ public final class PipelineTableMetaData {
@Getter
private final Collection<PipelineIndexMetaData> uniqueIndexes;
- public PipelineTableMetaData(final String name, final Map<String,
PipelineColumnMetaData> columnMetaDataMap, final
Collection<PipelineIndexMetaData> uniqueIndexes) {
+ public PipelineTableMetaData(final String name, final
Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> columnMetaDataMap, final
Collection<PipelineIndexMetaData> uniqueIndexes) {
this.name = name;
this.columnMetaDataMap = columnMetaDataMap;
List<PipelineColumnMetaData> columnMetaDataList = new
ArrayList<>(columnMetaDataMap.values());
@@ -84,7 +85,7 @@ public final class PipelineTableMetaData {
* @return column meta data
*/
public PipelineColumnMetaData getColumnMetaData(final String columnName) {
- PipelineColumnMetaData result = columnMetaDataMap.get(columnName);
+ PipelineColumnMetaData result = columnMetaDataMap.get(new
CaseInsensitiveIdentifier(columnName));
if (null == result) {
log.warn("Can not get column meta data for column name '{}',
columnNames={}", columnName, columnNames);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyPositionTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
similarity index 97%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyPositionTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
index 7b5f0bad345..657a77c206a 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyPositionTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNull;
-class UnsupportedKeyPositionTest {
+class UnsupportedKeyIngestPositionTest {
@Test
void assertInit() {
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index fc2ee4612cc..adff3e358aa 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -36,7 +37,7 @@ class PipelineTableMetaDataTest {
@BeforeEach
void setUp() {
PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test",
Types.INTEGER, "INTEGER", true, true, true);
- pipelineTableMetaData = new PipelineTableMetaData("test_data",
Collections.singletonMap("test", column), Collections.emptySet());
+ pipelineTableMetaData = new PipelineTableMetaData("test_data",
Collections.singletonMap(new CaseInsensitiveIdentifier("test"), column),
Collections.emptySet());
}
@Test
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 3f9631fe717..5db258901d3 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -115,8 +115,8 @@ class MySQLIncrementalDumperTest {
}
}
- private Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
- return
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName,
Function.identity()));
+ private Map<CaseInsensitiveIdentifier, PipelineColumnMetaData>
mockOrderColumnsMetaDataMap() {
+ return
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData ->
new CaseInsensitiveIdentifier(metaData.getName()), Function.identity()));
}
private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 964f5441baf..dab22d7bd3d 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -105,8 +105,8 @@ class WALEventConverterTest {
}
}
- private Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
- return
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName,
Function.identity()));
+ private Map<CaseInsensitiveIdentifier, PipelineColumnMetaData>
mockOrderColumnsMetaDataMap() {
+ return
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData ->
new CaseInsensitiveIdentifier(metaData.getName()), Function.identity()));
}
private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {