This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eb5e3ae544 Review and improve pipeline code (#30351)
4eb5e3ae544 is described below

commit 4eb5e3ae5447b39b1824978345a06ff3ba8b9d9e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Feb 29 20:06:13 2024 +0800

    Review and improve pipeline code (#30351)
    
    * Format code
    
    * Column name case insensitive in PipelineTableMetaData
    
    * Improve pipeline job start event dispatch on proxy starting
    
    * Refactor PipelineDataSourceSink.executeBatchDelete
---
 .../core/checker/DataSourceCheckEngine.java        |  4 +--
 .../table/MatchingTableInventoryChecker.java       |  2 +-
 .../RecordSingleTableInventoryCalculator.java      |  2 +-
 .../importer/sink/type/PipelineDataSourceSink.java | 26 +++++++++++--------
 .../PipelineContextManagerLifecycleListener.java   | 30 ++++++++++++++++------
 .../StandardPipelineTableMetaDataLoader.java       |  3 ++-
 .../core/metadata/model/PipelineTableMetaData.java |  7 ++---
 ....java => UnsupportedKeyIngestPositionTest.java} |  2 +-
 .../metadata/model/PipelineTableMetaDataTest.java  |  3 ++-
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  4 +--
 .../ingest/wal/WALEventConverterTest.java          |  4 +--
 11 files changed, 54 insertions(+), 33 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 39c5858bdc8..29ed6c7be20 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -20,11 +20,11 @@ package 
org.apache.shardingsphere.data.pipeline.core.checker;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -65,7 +65,7 @@ public final class DataSourceCheckEngine {
     
     /**
      * Check source data source.
-     * 
+     *
      * @param dataSources to be checked source data source
      */
     public void checkSourceDataSources(final Collection<DataSource> 
dataSources) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 6e799b958da..abd1e1ecc04 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -47,8 +47,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Matching table inventory checker.
  */
-@Slf4j
 @RequiredArgsConstructor
+@Slf4j
 public abstract class MatchingTableInventoryChecker implements 
TableInventoryChecker {
     
     private final TableInventoryCheckParameter param;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 9efea4fc839..a921d05b1e4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -128,7 +128,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
     
     private String getQuerySQL(final SingleTableInventoryCalculateParameter 
param) {
         if (null == param.getFirstUniqueKey()) {
-            throw new UnsupportedOperationException("Data consistency of 
DATA_MATCH type not support table without unique key and primary key now");
+            throw new UnsupportedOperationException("Record inventory 
calculator does not support table without unique key and primary key now");
         }
         PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder = new 
PipelineDataConsistencyCalculateSQLBuilder(param.getDatabaseType());
         Collection<String> columnNames = param.getColumnNames().isEmpty() ? 
Collections.singleton("*") : param.getColumnNames();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
index 8a38bba4e5d..d2c2b5c58b5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSink.java
@@ -217,20 +217,27 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     }
     
     private void executeBatchDelete(final Collection<DataRecord> dataRecords) 
throws SQLException {
-        DataRecord dataRecord = dataRecords.iterator().next();
-        String sql = 
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord,
-                RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
-        try (
-                Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            runningStatement.set(preparedStatement);
+        try (Connection connection = dataSource.getConnection()) {
             boolean transactionEnabled = dataRecords.size() > 1;
             if (transactionEnabled) {
                 connection.setAutoCommit(false);
             }
+            executeBatchDelete(connection, dataRecords, 
importerConfig.getShardingColumns(dataRecords.iterator().next().getTableName()));
+            if (transactionEnabled) {
+                connection.commit();
+            }
+        }
+    }
+    
+    private void executeBatchDelete(final Connection connection, final 
Collection<DataRecord> dataRecords, final Set<String> shardingColumns) throws 
SQLException {
+        DataRecord dataRecord = dataRecords.iterator().next();
+        String deleteSQL = 
importSQLBuilder.buildDeleteSQL(importerConfig.findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord,
+                RecordUtils.extractConditionColumns(dataRecord, 
shardingColumns));
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(deleteSQL)) {
+            runningStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
             for (DataRecord each : dataRecords) {
-                List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(dataRecord.getTableName()));
+                List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(each, 
importerConfig.getShardingColumns(each.getTableName()));
                 for (int i = 0; i < conditionColumns.size(); i++) {
                     Object oldValue = conditionColumns.get(i).getOldValue();
                     if (null == oldValue) {
@@ -241,9 +248,6 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
                 preparedStatement.addBatch();
             }
             preparedStatement.executeBatch();
-            if (transactionEnabled) {
-                connection.commit();
-            }
         } finally {
             runningStatement.set(null);
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
index a7c389d880b..d23d3707fb2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/listener/PipelineContextManagerLifecycleListener.java
@@ -21,11 +21,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNodeWatcher;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
@@ -74,17 +75,30 @@ public final class PipelineContextManagerLifecycleListener 
implements ContextMan
         JobConfigurationAPI jobConfigAPI = 
PipelineAPIFactory.getJobConfigurationAPI(contextKey);
         List<JobBriefInfo> allJobsBriefInfo = 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo()
                 .stream().filter(each -> 
!each.getJobName().startsWith("_")).collect(Collectors.toList());
+        log.info("All job names: {}", 
allJobsBriefInfo.stream().map(JobBriefInfo::getJobName).collect(Collectors.joining(",")));
         for (JobBriefInfo each : allJobsBriefInfo) {
-            PipelineJobType jobType = 
PipelineJobIdUtils.parseJobType(each.getJobName());
-            PipelineJobInfo jobInfo = jobType.getJobInfo(each.getJobName());
-            if (null == jobInfo || null == jobInfo.getJobMetaData()) {
+            PipelineJobType jobType;
+            try {
+                jobType = PipelineJobIdUtils.parseJobType(each.getJobName());
+            } catch (final IllegalArgumentException ex) {
+                log.warn("Parse job type failed, job name: {}, error: {}", 
each.getJobName(), ex.getMessage());
                 continue;
             }
-            if (!jobInfo.getJobMetaData().isActive()) {
-                return;
+            if ("CONSISTENCY_CHECK".equals(jobType.getCode())) {
+                continue;
+            }
+            JobConfigurationPOJO jobConfig;
+            try {
+                jobConfig = 
jobConfigAPI.getJobConfiguration(each.getJobName());
+            } catch (final PipelineJobNotFoundException ex) {
+                log.error("Get job configuration failed, job name: {}, error: 
{}", each.getJobName(), ex.getMessage());
+                continue;
+            }
+            if (jobConfig.isDisabled()) {
+                continue;
             }
-            JobConfigurationPOJO jobConfig = 
jobConfigAPI.getJobConfiguration(each.getJobName());
-            jobConfigAPI.updateJobConfiguration(jobConfig);
+            new PipelineJobManager(jobType).resume(each.getJobName());
+            log.info("Dispatch enable pipeline job start event, job name: {}", 
each.getJobName());
         }
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 38c56562af3..11a21830329 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -112,7 +112,8 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
             }
             Collection<PipelineIndexMetaData> uniqueIndexMetaData = 
uniqueKeys.entrySet().stream()
                     .map(entry -> new PipelineIndexMetaData(entry.getKey(), 
entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
-            result.put(new CaseInsensitiveIdentifier(each), new 
PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
+            result.put(new CaseInsensitiveIdentifier(each), new 
PipelineTableMetaData(each,
+                    
columnMetaDataMap.entrySet().stream().collect(Collectors.toMap(entry -> new 
CaseInsensitiveIdentifier(entry.getKey()), Entry::getValue)), 
uniqueIndexMetaData));
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index c87ab899efe..9f2a9810db5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -24,6 +24,7 @@ import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -44,7 +45,7 @@ public final class PipelineTableMetaData {
     @NonNull
     private final String name;
     
-    private final Map<String, PipelineColumnMetaData> columnMetaDataMap;
+    private final Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
columnMetaDataMap;
     
     @Getter
     private final List<String> columnNames;
@@ -55,7 +56,7 @@ public final class PipelineTableMetaData {
     @Getter
     private final Collection<PipelineIndexMetaData> uniqueIndexes;
     
-    public PipelineTableMetaData(final String name, final Map<String, 
PipelineColumnMetaData> columnMetaDataMap, final 
Collection<PipelineIndexMetaData> uniqueIndexes) {
+    public PipelineTableMetaData(final String name, final 
Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> columnMetaDataMap, final 
Collection<PipelineIndexMetaData> uniqueIndexes) {
         this.name = name;
         this.columnMetaDataMap = columnMetaDataMap;
         List<PipelineColumnMetaData> columnMetaDataList = new 
ArrayList<>(columnMetaDataMap.values());
@@ -84,7 +85,7 @@ public final class PipelineTableMetaData {
      * @return column meta data
      */
     public PipelineColumnMetaData getColumnMetaData(final String columnName) {
-        PipelineColumnMetaData result = columnMetaDataMap.get(columnName);
+        PipelineColumnMetaData result = columnMetaDataMap.get(new 
CaseInsensitiveIdentifier(columnName));
         if (null == result) {
             log.warn("Can not get column meta data for column name '{}', 
columnNames={}", columnName, columnNames);
         }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyPositionTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
similarity index 97%
rename from 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyPositionTest.java
rename to 
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
index 7b5f0bad345..657a77c206a 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyPositionTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/type/pk/type/UnsupportedKeyIngestPositionTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-class UnsupportedKeyPositionTest {
+class UnsupportedKeyIngestPositionTest {
     
     @Test
     void assertInit() {
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index fc2ee4612cc..adff3e358aa 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -36,7 +37,7 @@ class PipelineTableMetaDataTest {
     @BeforeEach
     void setUp() {
         PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", 
Types.INTEGER, "INTEGER", true, true, true);
-        pipelineTableMetaData = new PipelineTableMetaData("test_data", 
Collections.singletonMap("test", column), Collections.emptySet());
+        pipelineTableMetaData = new PipelineTableMetaData("test_data", 
Collections.singletonMap(new CaseInsensitiveIdentifier("test"), column), 
Collections.emptySet());
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 3f9631fe717..5db258901d3 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -115,8 +115,8 @@ class MySQLIncrementalDumperTest {
         }
     }
     
-    private Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
-        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName,
 Function.identity()));
+    private Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
mockOrderColumnsMetaDataMap() {
+        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData -> 
new CaseInsensitiveIdentifier(metaData.getName()), Function.identity()));
     }
     
     private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 964f5441baf..dab22d7bd3d 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -105,8 +105,8 @@ class WALEventConverterTest {
         }
     }
     
-    private Map<String, PipelineColumnMetaData> mockOrderColumnsMetaDataMap() {
-        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(PipelineColumnMetaData::getName,
 Function.identity()));
+    private Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
mockOrderColumnsMetaDataMap() {
+        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData -> 
new CaseInsensitiveIdentifier(metaData.getName()), Function.identity()));
     }
     
     private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {

Reply via email to