This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c0d6365fcb6 Add replication slot cleanup when drop CDC tasks (#28890)
c0d6365fcb6 is described below
commit c0d6365fcb6c7b9a1a341f1924132ab41cbab529
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Oct 30 11:42:07 2023 +0800
Add replication slot cleanup when drop CDC tasks (#28890)
* Add replication slot cleanup
* Add drop streaming at CDCClient
---
.../data/pipeline/cdc/client/CDCClient.java | 19 +++++++++++++++++++
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 11 +++++++++++
2 files changed, 30 insertions(+)
diff --git
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index ada8bf52152..82c3adc2eb0 100644
---
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.LoginType;
@@ -206,6 +207,24 @@ public final class CDCClient implements AutoCloseable {
log.info("Stop streaming success, streaming id: {}", streamingId);
}
+ /**
+ * Drop streaming.
+ *
+ * @param streamingId streaming id
+ */
+ public void dropStreaming(final String streamingId) {
+ String requestId = RequestIdUtils.generateRequestId();
+ DropStreamingRequestBody body =
DropStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+ CDCRequest request =
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.DROP_STREAMING).setDropStreamingRequestBody(body).build();
+ ResponseFuture responseFuture = new ResponseFuture(requestId,
Type.DROP_STREAMING);
+ ClientConnectionContext connectionContext =
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+ connectionContext.getResponseFutureMap().put(requestId,
responseFuture);
+ channel.writeAndFlush(request);
+ responseFuture.waitResponseResult(config.getTimeoutMills(),
connectionContext);
+ connectionContext.getStreamingIds().remove(streamingId);
+ log.info("Drop streaming success, streaming id: {}", streamingId);
+ }
+
@Override
public void close() {
if (null != channel) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 203271cf0c0..ac884312417 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -359,6 +359,17 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
stop(jobId);
}
dropJob(jobId);
+ cleanup(jobConfig);
+ }
+
+ private void cleanup(final CDCJobConfiguration jobConfig) {
+ for (Entry<String, Map<String, Object>> entry :
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
+ try {
+ PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(),
new StandardPipelineDataSourceConfiguration(entry.getValue()));
+ } catch (final SQLException ex) {
+ log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
+ }
+ }
}
@Override