This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c0d6365fcb6 Add replication slot cleanup when drop CDC tasks (#28890)
c0d6365fcb6 is described below

commit c0d6365fcb6c7b9a1a341f1924132ab41cbab529
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Oct 30 11:42:07 2023 +0800

    Add replication slot cleanup when drop CDC tasks (#28890)
    
    * Add replication slot cleanup
    
    * Add drop streaming at CDCClient
---
 .../data/pipeline/cdc/client/CDCClient.java           | 19 +++++++++++++++++++
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java         | 11 +++++++++++
 2 files changed, 30 insertions(+)

diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index ada8bf52152..82c3adc2eb0 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
 import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.LoginType;
@@ -206,6 +207,24 @@ public final class CDCClient implements AutoCloseable {
         log.info("Stop streaming success, streaming id: {}", streamingId);
     }
     
+    /**
+     * Drop streaming.
+     *
+     * @param streamingId streaming id
+     */
+    public void dropStreaming(final String streamingId) {
+        String requestId = RequestIdUtils.generateRequestId();
+        DropStreamingRequestBody body = 
DropStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.DROP_STREAMING).setDropStreamingRequestBody(body).build();
+        ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.DROP_STREAMING);
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        responseFuture.waitResponseResult(config.getTimeoutMills(), 
connectionContext);
+        connectionContext.getStreamingIds().remove(streamingId);
+        log.info("Drop streaming success, streaming id: {}", streamingId);
+    }
+    
     @Override
     public void close() {
         if (null != channel) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 203271cf0c0..ac884312417 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -359,6 +359,17 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
             stop(jobId);
         }
         dropJob(jobId);
+        cleanup(jobConfig);
+    }
+    
+    private void cleanup(final CDCJobConfiguration jobConfig) {
+        for (Entry<String, Map<String, Object>> entry : 
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
+            try {
+                PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), 
new StandardPipelineDataSourceConfiguration(entry.getValue()));
+            } catch (final SQLException ex) {
+                log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
+            }
+        }
     }
     
     @Override

Reply via email to