This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dc41851cf35 Fix CDC job id inconsistent when restart 
ShardingSphere-Proxy (#29418)
dc41851cf35 is described below

commit dc41851cf350e678c959bf1a0e05261794c4b225
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Dec 15 21:24:12 2023 +0800

    Fix CDC job id inconsistent when restart ShardingSphere-Proxy (#29418)
    
    * Fix CDC job id inconsistent
    
    * Improve CDC init inventoryTasks, avoid task unnecessary waiting
    
    * Fix sorted and loss inventory
---
 .../data/pipeline/core/job/type/PipelineJobType.java   | 12 +++++++-----
 .../shardingsphere/data/pipeline/cdc/CDCJob.java       |  8 +++++---
 .../data/pipeline/cdc/api/CDCJobAPI.java               |  7 +++++--
 .../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 18 ++++++++++--------
 .../data/pipeline/cdc/core/task/CDCInventoryTask.java  | 11 ++++++++---
 .../data/pipeline/cdc/handler/CDCBackendHandler.java   |  4 ++--
 6 files changed, 37 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index 28e57410998..ec6a6c3afeb 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -17,16 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.type;
 
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import com.fasterxml.jackson.annotation.JsonIgnoreType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlPipelineJobItemProgressConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlPipelineJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
@@ -37,6 +38,7 @@ import java.util.Optional;
  * Pipeline job type.
  */
 @SingletonSPI
+@JsonIgnoreType
 public interface PipelineJobType extends TypedSPI {
     
     /**
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 84476595190..629443abbaf 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -51,6 +51,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
@@ -118,9 +119,10 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
                 jobConfig.getDataSourceConfig().getType(), 
jobConfig.getDataSourceConfig().getParameter());
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
                 
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
mapper, batchSize, writeRateLimitAlgorithm, 0, 1);
+        PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = null == 
write.getRateLimiter() ? null
+                : TypedSPILoader.getService(JobRateLimitAlgorithm.class, 
write.getRateLimiter().getType(), write.getRateLimiter().getProps());
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 7567053f893..8474fcfe2e9 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -69,6 +69,7 @@ import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
 
 import java.sql.SQLException;
 import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -133,9 +134,11 @@ public final class CDCJobAPI implements TransmissionJobAPI 
{
     
     private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final 
StreamDataParameter param, final CDCSinkType sinkType, final Properties 
sinkProps, final PipelineContextKey contextKey) {
         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
-        result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, 
param.getSchemaTableNames(), param.isFull())));
+        List<String> schemaTableNames = param.getSchemaTableNames();
+        Collections.sort(schemaTableNames);
+        result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey, 
schemaTableNames, param.isFull())));
         result.setDatabaseName(param.getDatabaseName());
-        result.setSchemaTableNames(param.getSchemaTableNames());
+        result.setSchemaTableNames(schemaTableNames);
         result.setFull(param.isFull());
         result.setDecodeWithTX(param.isDecodeWithTX());
         YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 59fe991d7ce..82e1330c395 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -25,28 +25,28 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgr
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
-import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -122,7 +122,9 @@ public final class CDCJobPreparer {
                 .splitInventoryDumperContext(jobItemContext)) {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
 importerConfig.getBatchSize(), position);
-            channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
+            if (!(position.get() instanceof FinishedPosition)) {
+                channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
+            }
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
                     : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
@@ -130,7 +132,7 @@ public final class CDCJobPreparer {
                             importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
-            if (!(each.getCommonContext().getPosition() instanceof 
FinishedPosition)) {
+            if (!(position.get() instanceof FinishedPosition)) {
                 importerUsed.set(true);
             }
         }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
index ae463513138..9db450c9930 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
@@ -21,16 +21,18 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.TaskExecuteCallback;
+import 
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +61,9 @@ public final class CDCInventoryTask implements PipelineTask {
     
     @Override
     public Collection<CompletableFuture<?>> start() {
+        if (position.get() instanceof FinishedPosition) {
+            return Collections.emptyList();
+        }
         Collection<CompletableFuture<?>> result = new LinkedList<>();
         result.add(inventoryDumperExecuteEngine.submit(dumper, new 
TaskExecuteCallback(this)));
         if (null != importer) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index cfad071beaf..a397ee6272a 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -58,9 +58,9 @@ import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -117,7 +117,7 @@ public final class CDCBackendHandler {
         Map<String, List<DataNode>> actualDataNodesMap = 
CDCDataNodeUtils.buildDataNodesMap(database, tableNames);
         ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(), 
() -> new PipelineInvalidParameterException(String.format("Not find table %s", 
tableNames)));
         boolean decodeWithTx = database.getProtocolType() instanceof 
OpenGaussDatabaseType;
-        StreamDataParameter parameter = new 
StreamDataParameter(requestBody.getDatabase(), new 
LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, 
decodeWithTx);
+        StreamDataParameter parameter = new 
StreamDataParameter(requestBody.getDatabase(), new 
ArrayList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, 
decodeWithTx);
         String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new 
Properties());
         connectionContext.setJobId(jobId);
         startStreaming(jobId, connectionContext, channel);

Reply via email to