This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dc41851cf35 Fix CDC job id inconsistent when restart
ShardingSphere-Proxy (#29418)
dc41851cf35 is described below
commit dc41851cf350e678c959bf1a0e05261794c4b225
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Dec 15 21:24:12 2023 +0800
Fix CDC job id inconsistent when restart ShardingSphere-Proxy (#29418)
* Fix CDC job id inconsistent
* Improve CDC init inventoryTasks, avoid task unnecessary waiting
* Fix sorted and loss inventory
---
.../data/pipeline/core/job/type/PipelineJobType.java | 12 +++++++-----
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 8 +++++---
.../data/pipeline/cdc/api/CDCJobAPI.java | 7 +++++--
.../data/pipeline/cdc/core/prepare/CDCJobPreparer.java | 18 ++++++++++--------
.../data/pipeline/cdc/core/task/CDCInventoryTask.java | 11 ++++++++---
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 4 ++--
6 files changed, 37 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
index 28e57410998..ec6a6c3afeb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/type/PipelineJobType.java
@@ -17,16 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.job.type;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
+import com.fasterxml.jackson.annotation.JsonIgnoreType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.config.yaml.swapper.YamlPipelineJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlPipelineJobItemProgressConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper.YamlPipelineJobItemProgressSwapper;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
@@ -37,6 +38,7 @@ import java.util.Optional;
* Pipeline job type.
*/
@SingletonSPI
+@JsonIgnoreType
public interface PipelineJobType extends TypedSPI {
/**
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 84476595190..629443abbaf 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -51,6 +51,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
@@ -118,9 +119,10 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
mapper, batchSize, writeRateLimitAlgorithm, 0, 1);
+ PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = null ==
write.getRateLimiter() ? null
+ : TypedSPILoader.getService(JobRateLimitAlgorithm.class,
write.getRateLimiter().getType(), write.getRateLimiter().getProps());
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 7567053f893..8474fcfe2e9 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -69,6 +69,7 @@ import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigur
import java.sql.SQLException;
import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -133,9 +134,11 @@ public final class CDCJobAPI implements TransmissionJobAPI
{
private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final
StreamDataParameter param, final CDCSinkType sinkType, final Properties
sinkProps, final PipelineContextKey contextKey) {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
- result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey,
param.getSchemaTableNames(), param.isFull())));
+ List<String> schemaTableNames = param.getSchemaTableNames();
+ Collections.sort(schemaTableNames);
+ result.setJobId(PipelineJobIdUtils.marshal(new CDCJobId(contextKey,
schemaTableNames, param.isFull())));
result.setDatabaseName(param.getDatabaseName());
- result.setSchemaTableNames(param.getSchemaTableNames());
+ result.setSchemaTableNames(schemaTableNames);
result.setFull(param.isFull());
result.setDecodeWithTX(param.isDecodeWithTX());
YamlSinkConfiguration sinkConfig = new YamlSinkConfiguration();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 59fe991d7ce..82e1330c395 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -25,28 +25,28 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgr
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
-import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
-import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
+import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -122,7 +122,9 @@ public final class CDCJobPreparer {
.splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
PipelineTaskUtils.createInventoryChannel(processContext.getPipelineChannelCreator(),
importerConfig.getBatchSize(), position);
- channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
+ if (!(position.get() instanceof FinishedPosition)) {
+ channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
+ }
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
@@ -130,7 +132,7 @@ public final class CDCJobPreparer {
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
- if (!(each.getCommonContext().getPosition() instanceof
FinishedPosition)) {
+ if (!(position.get() instanceof FinishedPosition)) {
importerUsed.set(true);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
index ae463513138..9db450c9930 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryTask.java
@@ -21,16 +21,18 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.TaskExecuteCallback;
+import
org.apache.shardingsphere.data.pipeline.core.task.progress.InventoryTaskProgress;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -59,6 +61,9 @@ public final class CDCInventoryTask implements PipelineTask {
@Override
public Collection<CompletableFuture<?>> start() {
+ if (position.get() instanceof FinishedPosition) {
+ return Collections.emptyList();
+ }
Collection<CompletableFuture<?>> result = new LinkedList<>();
result.add(inventoryDumperExecuteEngine.submit(dumper, new
TaskExecuteCallback(this)));
if (null != importer) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index cfad071beaf..a397ee6272a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -58,9 +58,9 @@ import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePrecondition
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -117,7 +117,7 @@ public final class CDCBackendHandler {
Map<String, List<DataNode>> actualDataNodesMap =
CDCDataNodeUtils.buildDataNodesMap(database, tableNames);
ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(),
() -> new PipelineInvalidParameterException(String.format("Not find table %s",
tableNames)));
boolean decodeWithTx = database.getProtocolType() instanceof
OpenGaussDatabaseType;
- StreamDataParameter parameter = new
StreamDataParameter(requestBody.getDatabase(), new
LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap,
decodeWithTx);
+ StreamDataParameter parameter = new
StreamDataParameter(requestBody.getDatabase(), new
ArrayList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap,
decodeWithTx);
String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new
Properties());
connectionContext.setJobId(jobId);
startStreaming(jobId, connectionContext, channel);