This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 39a96e5592b Fix drop streaming failed and improve CDC inventory task
thread wait intervals (#29453)
39a96e5592b is described below
commit 39a96e5592bc21e778b1f6cd54f0fcabbaa86372
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Dec 19 21:16:12 2023 +0800
Fix drop streaming failed and improve CDC inventory task thread wait
intervals (#29453)
* Adjust CDC thread wait intervals
* Fix drop streaming failed
* Fixes needSort flag may not equals
* Use requestBody
---
.../core/ingest/dumper/InventoryDumper.java | 18 +++++------
.../inventory/InventoryRecordsCountCalculator.java | 2 +-
.../pipeline/cdc/core/importer/CDCImporter.java | 37 ++++++++++++----------
.../cdc/core/importer/sink/CDCSocketSink.java | 2 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 9 +++---
.../frontend/netty/CDCChannelInboundHandler.java | 4 +--
6 files changed, 38 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
index 3092c75f2f9..ac0b876db8d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumper.java
@@ -21,10 +21,17 @@ import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPositionFactory;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
@@ -33,17 +40,10 @@ import
org.apache.shardingsphere.data.pipeline.core.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.PlaceholderPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.PrimaryKeyPositionFactory;
import
org.apache.shardingsphere.data.pipeline.core.query.JDBCStreamQueryBuilder;
+import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineInventoryDumpSQLBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -146,7 +146,7 @@ public final class InventoryDumper extends
AbstractPipelineLifecycleRunnable imp
dataRecords.add(new FinishedRecord(new FinishedPosition()));
channel.pushRecords(dataRecords);
dumpStatement.set(null);
- log.info("Inventory dump done, rowCount={}", rowCount);
+ log.info("Inventory dump done, rowCount={}, dataSource={},
actualTable={}", rowCount,
dumperContext.getCommonContext().getDataSourceName(),
dumperContext.getActualTableName());
}
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
index 894d5d55f91..786a5f8ff62 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
@@ -93,7 +93,7 @@ public final class InventoryRecordsCountCalculator {
result = resultSet.getLong(1);
}
}
- log.info("getCount cost {} ms, sql: {}", System.currentTimeMillis() -
startTimeMillis, countSQL);
+ log.info("getCount cost {} ms, sql: {}, count: {}",
System.currentTimeMillis() - startTimeMillis, countSQL, result);
return result;
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
index 64d27ace25d..b325b546fc2 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/CDCImporter.java
@@ -25,18 +25,18 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
+import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
+import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.core.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
-import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressUpdatedParameter;
-import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
-import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import java.util.ArrayList;
@@ -78,11 +78,14 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
@Override
protected void runBlocking() {
CDCImporterManager.putImporter(this);
+ for (CDCChannelProgressPair each : originalChannelProgressPairs) {
+ each.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ }
while (isRunning()) {
if (needSorting) {
- doWithSorting(originalChannelProgressPairs);
+ doWithSorting();
} else {
- doWithoutSorting(originalChannelProgressPairs);
+ doWithoutSorting();
}
if (originalChannelProgressPairs.isEmpty()) {
break;
@@ -90,36 +93,38 @@ public final class CDCImporter extends
AbstractPipelineLifecycleRunnable impleme
}
}
- private void doWithoutSorting(final List<CDCChannelProgressPair>
channelProgressPairs) {
- for (final CDCChannelProgressPair channelProgressPair :
channelProgressPairs) {
+ private void doWithoutSorting() {
+ for (final CDCChannelProgressPair channelProgressPair :
originalChannelProgressPairs) {
PipelineChannel channel = channelProgressPair.getChannel();
List<Record> records = channel.fetchRecords(batchSize, timeout,
timeUnit).stream().filter(each -> !(each instanceof
PlaceholderRecord)).collect(Collectors.toList());
if (records.isEmpty()) {
continue;
}
+ Record lastRecord = records.get(records.size() - 1);
+ if (lastRecord instanceof FinishedRecord &&
records.stream().noneMatch(DataRecord.class::isInstance)) {
+ channel.ack(records);
+
channelProgressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
+ originalChannelProgressPairs.remove(channelProgressPair);
+ continue;
+ }
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
}
String ackId = CDCAckId.build(importerId).marshal();
ackCache.put(ackId,
Collections.singletonList(Pair.of(channelProgressPair, new
CDCAckPosition(records.get(records.size() - 1),
getDataRecordsCount(records)))));
sink.write(ackId, records);
- Record lastRecord = records.get(records.size() - 1);
- if (lastRecord instanceof FinishedRecord &&
records.stream().noneMatch(DataRecord.class::isInstance)) {
- channel.ack(records);
-
channelProgressPair.getJobProgressListener().onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
- }
}
}
@SneakyThrows(InterruptedException.class)
- private void doWithSorting(final List<CDCChannelProgressPair>
channelProgressPairs) {
+ private void doWithSorting() {
if (null != rateLimitAlgorithm) {
rateLimitAlgorithm.intercept(JobOperationType.INSERT, 1);
}
CSNRecords firstCsnRecords = null;
List<CSNRecords> csnRecordsList = new LinkedList<>();
- for (int i = 0, count = channelProgressPairs.size(); i < count; i++) {
- prepareTransactionRecords(channelProgressPairs);
+ for (int i = 0, count = originalChannelProgressPairs.size(); i <
count; i++) {
+ prepareTransactionRecords(originalChannelProgressPairs);
CSNRecords csnRecords = csnRecordsQueue.peek();
if (null == csnRecords) {
continue;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 45aff5d8222..6a3dff0ce45 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -49,7 +49,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public final class CDCSocketSink implements PipelineSink {
- private static final long DEFAULT_TIMEOUT_MILLISECONDS = 200L;
+ private static final long DEFAULT_TIMEOUT_MILLISECONDS = 100L;
private final Lock lock = new ReentrantLock();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 9b991c3e8e1..f22418276c4 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,8 +41,8 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -53,7 +53,6 @@ import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabase
import java.sql.SQLException;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -79,7 +78,7 @@ public final class CDCJobPreparer {
AtomicBoolean inventoryImporterUsed = new AtomicBoolean();
List<CDCChannelProgressPair> inventoryChannelProgressPairs = new
CopyOnWriteArrayList<>();
AtomicBoolean incrementalImporterUsed = new AtomicBoolean();
- List<CDCChannelProgressPair> incrementalChannelProgressPairs = new
LinkedList<>();
+ List<CDCChannelProgressPair> incrementalChannelProgressPairs = new
CopyOnWriteArrayList<>();
for (CDCJobItemContext each : jobItemContexts) {
initTasks0(each, inventoryImporterUsed,
inventoryChannelProgressPairs, incrementalImporterUsed,
incrementalChannelProgressPairs);
}
@@ -127,7 +126,7 @@ public final class CDCJobPreparer {
}
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
- : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 3, TimeUnit.SECONDS, jobItemContext.getSink(),
+ : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS,
jobItemContext.getSink(),
needSorting(ImporterType.INVENTORY,
hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
@@ -156,7 +155,7 @@ public final class CDCJobPreparer {
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType())
.createIncrementalDumper(dumperContext,
dumperContext.getCommonContext().getPosition(), channel,
jobItemContext.getSourceMetaDataLoader());
- boolean needSorting = needSorting(ImporterType.INCREMENTAL,
hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
+ boolean needSorting = jobItemContext.getJobConfig().isDecodeWithTX();
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 300, TimeUnit.MILLISECONDS,
jobItemContext.getSink(), needSorting,
importerConfig.getRateLimitAlgorithm());
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 88ded2b79ed..acbd2fdf009 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -210,7 +210,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
StopStreamingRequestBody requestBody =
request.getStopStreamingRequestBody();
String database =
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(), database);
- backendHandler.stopStreaming(connectionContext.getJobId(),
ctx.channel().id());
+ backendHandler.stopStreaming(requestBody.getStreamingId(),
ctx.channel().id());
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
}
@@ -218,7 +218,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
private void processDropStreamingRequest(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
DropStreamingRequestBody requestBody =
request.getDropStreamingRequestBody();
checkPrivileges(request.getRequestId(),
connectionContext.getCurrentUser().getGrantee(),
backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
- backendHandler.dropStreaming(connectionContext.getJobId());
+ backendHandler.dropStreaming(requestBody.getStreamingId());
connectionContext.setJobId(null);
ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
}