This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7055cfd7f46 Refactor CDCClient, add more methods for use (#27542)
7055cfd7f46 is described below

commit 7055cfd7f463bf50198f8913f9a7af3b15f8b5f8
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 2 14:48:02 2023 +0800

    Refactor CDCClient, add more methods for use (#27542)
    
    * Add more methods of CDCClient
    
    * Remove unnecessary method
    
    * Refactor CDCClient
    
    * Add exception handler of CDC
    
    * Add more response type
    
    * Cache request type
---
 .../data/pipeline/cdc/client/CDCClient.java        | 202 ++++++++++++++++-----
 .../CDCClientConfiguration.java}                   |  34 ++--
 .../client/constant/ClientConnectionStatus.java    |   6 +-
 .../client/context/ClientConnectionContext.java    |  13 +-
 .../GetResultTimeoutException.java}                |  16 +-
 .../ServerResultException.java}                    |  16 +-
 .../cdc/client/handler/CDCRequestHandler.java      |  73 +++++---
 .../ExceptionHandler.java}                         |  26 ++-
 .../client/handler/LoggerExceptionHandler.java}    |  24 ++-
 .../cdc/client/handler/LoginRequestHandler.java    | 105 -----------
 .../CDCLoginParameter.java}                        |  15 +-
 ...Parameter.java => StartStreamingParameter.java} |  24 +--
 .../pipeline/cdc/client/util/ResponseFuture.java   |  86 +++++++++
 .../cdc/client/util/ServerErrorResult.java}        |  14 +-
 .../pipeline/cdc/client/example/Bootstrap.java     |  26 +--
 .../pipeline/cdc/context/CDCJobItemContext.java    |   4 -
 .../cdc/core/importer/sink/CDCSocketSink.java      |   1 -
 .../cdc/exception/CDCExceptionWrapper.java         |   8 +-
 .../pipeline/cdc/generator/CDCResponseUtils.java   |   3 +-
 .../frontend/netty/CDCChannelInboundHandler.java   |  11 +-
 .../netty/CDCChannelInboundHandlerTest.java        |   2 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  34 ++--
 22 files changed, 435 insertions(+), 308 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index 83e1aebd33d..970a99d59fa 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.client;
 
+import com.google.common.hash.Hashing;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.ChannelFuture;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -30,57 +31,42 @@ import 
io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.CDCRequestHandler;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoginRequestHandler;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
+import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.LoginType;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
-
-import java.util.List;
-import java.util.function.Consumer;
 
 /**
  * CDC client.
  */
 @Slf4j
-public final class CDCClient {
-    
-    private final StartCDCClientParameter parameter;
+public final class CDCClient implements AutoCloseable {
     
-    private final Consumer<List<Record>> consumer;
-    
-    public CDCClient(final StartCDCClientParameter parameter, final 
Consumer<List<Record>> consumer) {
-        validateParameter(parameter);
-        this.parameter = parameter;
-        this.consumer = consumer;
-    }
+    private final CDCClientConfiguration config;
     
-    private void validateParameter(final StartCDCClientParameter parameter) {
-        if (null == parameter.getDatabase() || 
parameter.getDatabase().isEmpty()) {
-            throw new IllegalArgumentException("The database parameter can't 
be null");
-        }
-        if (null == parameter.getUsername() || 
parameter.getUsername().isEmpty()) {
-            throw new IllegalArgumentException("The username parameter can't 
be null");
-        }
-        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
-            throw new IllegalArgumentException("The address parameter can't be 
null");
-        }
-        if (null == parameter.getSchemaTables() || 
parameter.getSchemaTables().isEmpty()) {
-            throw new IllegalArgumentException("The schema tables parameter 
can't be null");
-        }
-    }
+    private final NioEventLoopGroup group;
     
-    /**
-     * Start ShardingSphere CDC client.
-     */
-    public void start() {
-        startInternal(parameter.getAddress(), parameter.getPort());
-    }
+    private final Channel channel;
     
     @SneakyThrows(InterruptedException.class)
-    private void startInternal(final String address, final int port) {
+    public CDCClient(final CDCClientConfiguration config) {
+        validateParameter(config);
+        this.config = config;
         Bootstrap bootstrap = new Bootstrap();
-        NioEventLoopGroup group = new NioEventLoopGroup();
+        group = new NioEventLoopGroup();
         bootstrap.channel(NioSocketChannel.class)
                 .group(group)
                 .option(ChannelOption.ALLOCATOR, 
PooledByteBufAllocator.DEFAULT)
@@ -93,14 +79,140 @@ public final class CDCClient {
                         channel.pipeline().addLast(new 
ProtobufDecoder(CDCResponse.getDefaultInstance()));
                         channel.pipeline().addLast(new 
ProtobufVarint32LengthFieldPrepender());
                         channel.pipeline().addLast(new ProtobufEncoder());
-                        channel.pipeline().addLast(new 
LoginRequestHandler(parameter.getUsername(), parameter.getPassword()));
-                        channel.pipeline().addLast(new 
CDCRequestHandler(parameter, consumer));
+                        channel.pipeline().addLast(new 
CDCRequestHandler(config.getDataConsumer(), config.getExceptionHandler()));
                     }
                 });
-        try {
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            future.channel().closeFuture().sync();
-        } finally {
+        channel = bootstrap.connect(config.getAddress(), 
config.getPort()).sync().channel();
+    }
+    
+    private void validateParameter(final CDCClientConfiguration parameter) {
+        if (null == parameter.getAddress() || 
parameter.getAddress().isEmpty()) {
+            throw new IllegalArgumentException("The address parameter can't be 
null");
+        }
+        if (parameter.getPort() <= 0) {
+            throw new IllegalArgumentException("The port must be greater than 
0");
+        }
+    }
+    
+    /**
+     * Check channel is active.
+     *
+     * @return true if channel is active
+     */
+    public boolean isActive() {
+        return channel.isActive();
+    }
+    
+    /**
+     * Await channel close.
+     *
+     * @throws InterruptedException interrupted exception
+     */
+    public void await() throws InterruptedException {
+        channel.closeFuture().sync();
+    }
+    
+    /**
+     * Login.
+     *
+     * @param parameter parameter
+     * @throws IllegalStateException the channel is not active
+     */
+    public synchronized void login(final CDCLoginParameter parameter) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (ClientConnectionStatus.LOGGED_IN == 
connectionContext.getStatus().get()) {
+            throw new IllegalStateException("The client is already logged in");
+        }
+        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
+                
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
+        ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.LOGIN);
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(data);
+        responseFuture.waitResponseResult(config.getTimeoutMills(), 
connectionContext);
+        log.info("Login success, username: {}", parameter.getUsername());
+    }
+    
+    /**
+     * Start streaming.
+     *
+     * @param parameter parameter
+     * @return streaming id
+     */
+    public String startStreaming(final StartStreamingParameter parameter) {
+        StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
+                .addAllSourceSchemaTable(parameter.getSchemaTables()).build();
+        String requestId = RequestIdUtils.generateRequestId();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.STREAM_DATA);
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        String result = 
responseFuture.waitResponseResult(config.getTimeoutMills(), 
connectionContext).toString();
+        log.info("Start streaming success, streaming id: {}", result);
+        return result;
+    }
+    
+    /**
+     * Restart streaming.
+     *
+     * @param streamingId streaming id
+     */
+    public void restartStreaming(final String streamingId) {
+        if (checkStreamingIdExist(streamingId)) {
+            stopStreaming(streamingId);
+        }
+        String requestId = RequestIdUtils.generateRequestId();
+        StartStreamingRequestBody body = 
StartStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.START_STREAMING).setStartStreamingRequestBody(body).build();
+        ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.START_STREAMING);
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        responseFuture.waitResponseResult(config.getTimeoutMills(), 
connectionContext);
+        log.info("Restart streaming success, streaming id: {}", streamingId);
+    }
+    
+    private boolean checkStreamingIdExist(final String streamingId) {
+        if (null == channel || !channel.isActive()) {
+            throw new IllegalStateException("The channel is not active, please 
start the client first");
+        }
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        if (null == connectionContext) {
+            log.warn("The connection context not exist");
+            return true;
+        }
+        return connectionContext.getStreamingIds().contains(streamingId);
+    }
+    
+    /**
+     * Stop streaming.
+     *
+     * @param streamingId streaming id
+     */
+    public void stopStreaming(final String streamingId) {
+        String requestId = RequestIdUtils.generateRequestId();
+        StopStreamingRequestBody body = 
StopStreamingRequestBody.newBuilder().setStreamingId(streamingId).build();
+        CDCRequest request = 
CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STOP_STREAMING).setStopStreamingRequestBody(body).build();
+        ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.STOP_STREAMING);
+        ClientConnectionContext connectionContext = 
channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
+        connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
+        channel.writeAndFlush(request);
+        responseFuture.waitResponseResult(config.getTimeoutMills(), 
connectionContext);
+        connectionContext.getStreamingIds().remove(streamingId);
+        log.info("Stop streaming success, streaming id: {}", streamingId);
+    }
+    
+    @Override
+    public void close() {
+        if (null != channel) {
+            channel.close().awaitUninterruptibly();
+        }
+        if (null != group) {
             group.shutdownGracefully();
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
similarity index 51%
copy from 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
copy to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
index f7148d4880b..99c7de69420 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java
@@ -15,34 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+package org.apache.shardingsphere.data.pipeline.cdc.client.config;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 import java.util.List;
+import java.util.function.Consumer;
 
 /**
- * Start CDC client parameter.
+ * CDC client configuration.
  */
-@Getter
-@Setter
 @RequiredArgsConstructor
-public final class StartCDCClientParameter {
-    
-    private String address;
+@Getter
+public final class CDCClientConfiguration {
     
-    private int port;
+    private final String address;
     
-    private String username;
+    private final int port;
     
-    private String password;
+    private final Consumer<List<Record>> dataConsumer;
     
-    private String database;
+    private final ExceptionHandler exceptionHandler;
     
-    private List<SchemaTable> schemaTables;
+    private final int timeoutMills;
     
-    private boolean full;
+    public CDCClientConfiguration(final String address, final int port, final 
Consumer<List<Record>> dataConsumer, final ExceptionHandler exceptionHandler) {
+        this.address = address;
+        this.port = port;
+        this.dataConsumer = dataConsumer;
+        this.exceptionHandler = exceptionHandler;
+        this.timeoutMills = 5000;
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
index f644e5f59ff..f0c4834dff7 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
@@ -22,11 +22,7 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.constant;
  */
 public enum ClientConnectionStatus {
     
-    CONNECTED,
-    
     NOT_LOGGED_IN,
     
-    LOGGING_IN,
-    
-    STREAMING
+    LOGGED_IN
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
index fcbf086979b..d3d85aa9992 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/context/ClientConnectionContext.java
@@ -21,6 +21,13 @@ import io.netty.util.AttributeKey;
 import lombok.Getter;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Client connection context.
@@ -31,7 +38,9 @@ public final class ClientConnectionContext {
     
     public static final AttributeKey<ClientConnectionContext> CONTEXT_KEY = 
AttributeKey.valueOf("client.context");
     
-    private volatile ClientConnectionStatus status;
+    private final AtomicReference<ClientConnectionStatus> status = new 
AtomicReference<>();
+    
+    private final Set<String> streamingIds = new CopyOnWriteArraySet<>();
     
-    private volatile String streamingId;
+    private final Map<String, ResponseFuture> responseFutureMap = new 
ConcurrentHashMap<>();
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/GetResultTimeoutException.java
similarity index 70%
copy from 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
copy to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/GetResultTimeoutException.java
index f644e5f59ff..e0fc012a594 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/GetResultTimeoutException.java
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.constant;
+package org.apache.shardingsphere.data.pipeline.cdc.client.exception;
 
 /**
- * Client connection status.
+ * Get result timeout exception.
  */
-public enum ClientConnectionStatus {
+public final class GetResultTimeoutException extends RuntimeException {
     
-    CONNECTED,
+    private static final long serialVersionUID = 8473381735246753765L;
     
-    NOT_LOGGED_IN,
-    
-    LOGGING_IN,
-    
-    STREAMING
+    public GetResultTimeoutException(final String message) {
+        super(message);
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/ServerResultException.java
similarity index 71%
copy from 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
copy to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/ServerResultException.java
index f644e5f59ff..7a438a3b3f3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/exception/ServerResultException.java
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.constant;
+package org.apache.shardingsphere.data.pipeline.cdc.client.exception;
 
 /**
- * Client connection status.
+ * Server result exception.
  */
-public enum ClientConnectionStatus {
+public final class ServerResultException extends RuntimeException {
     
-    CONNECTED,
+    private static final long serialVersionUID = 6661736945772674919L;
     
-    NOT_LOGGED_IN,
-    
-    LOGGING_IN,
-    
-    STREAMING
+    public ServerResultException(final String message) {
+        super(message);
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
index 3483ed14f6a..724001e8d5d 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/CDCRequestHandler.java
@@ -23,20 +23,20 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.StreamDataEvent;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
-import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 /**
@@ -46,34 +46,64 @@ import java.util.function.Consumer;
 @Slf4j
 public final class CDCRequestHandler extends ChannelInboundHandlerAdapter {
     
-    private final StartCDCClientParameter parameter;
-    
     private final Consumer<List<Record>> consumer;
     
+    private final ExceptionHandler exceptionHandler;
+    
     @Override
-    public void userEventTriggered(final ChannelHandlerContext ctx, final 
Object evt) {
-        if (evt instanceof StreamDataEvent) {
-            StreamDataRequestBody streamDataRequestBody = 
StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
-                    
.addAllSourceSchemaTable(parameter.getSchemaTables()).build();
-            CDCRequest request = 
CDCRequest.newBuilder().setRequestId(RequestIdUtils.generateRequestId()).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
-            ctx.writeAndFlush(request);
-        }
+    public void channelActive(final ChannelHandlerContext ctx) {
+        ClientConnectionContext context = new ClientConnectionContext();
+        context.getStatus().set(ClientConnectionStatus.NOT_LOGGED_IN);
+        
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(context);
+    }
+    
+    @Override
+    public void channelInactive(final ChannelHandlerContext ctx) {
+        
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
+        log.info("Channel inactive, stop CDC client");
+        ctx.fireChannelInactive();
     }
     
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCResponse response = (CDCResponse) msg;
+        ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
+        Optional<ResponseFuture> responseFuture = 
Optional.ofNullable(connectionContext.getResponseFutureMap().get(response.getRequestId()));
         if (response.getStatus() != Status.SUCCEED) {
-            log.error("received error response {}", msg);
+            Type requestType = Type.UNKNOWN;
+            if (responseFuture.isPresent()) {
+                final ResponseFuture future = responseFuture.get();
+                future.setErrorCode(response.getErrorCode());
+                future.setErrorMessage(response.getErrorMessage());
+                future.countDown();
+                requestType = future.getRequestType();
+            }
+            exceptionHandler.handleServerException(new 
ServerErrorResult(response.getErrorCode(), response.getErrorMessage(), 
requestType));
+            responseFuture.ifPresent(future -> {
+                future.setErrorCode(response.getErrorCode());
+                future.setErrorMessage(response.getErrorMessage());
+                future.countDown();
+            });
+            return;
+        }
+        if (response.hasServerGreetingResult()) {
+            ServerGreetingResult serverGreetingResult = 
response.getServerGreetingResult();
+            log.info("Received server greeting result, serverVersion={}, 
protocolVersion={}", serverGreetingResult.getServerVersion(), 
serverGreetingResult.getProtocolVersion());
+            return;
+        }
+        if (ClientConnectionStatus.NOT_LOGGED_IN == 
connectionContext.getStatus().get() && responseFuture.isPresent() && Type.LOGIN 
== responseFuture.get().getRequestType()) {
+            responseFuture.ifPresent(ResponseFuture::countDown);
+            
connectionContext.getStatus().set(ClientConnectionStatus.LOGGED_IN);
+            return;
         }
-        ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
         if (response.hasStreamDataResult()) {
             StreamDataResult streamDataResult = response.getStreamDataResult();
-            
connectionContext.setStreamingId(streamDataResult.getStreamingId());
-            connectionContext.setStatus(ClientConnectionStatus.STREAMING);
+            responseFuture.ifPresent(future -> 
future.setResult(response.getStreamDataResult().getStreamingId()));
+            
connectionContext.getStreamingIds().add(streamDataResult.getStreamingId());
         } else if (response.hasDataRecordResult()) {
             processDataRecords(ctx, response.getDataRecordResult());
         }
+        responseFuture.ifPresent(ResponseFuture::countDown);
     }
     
     private void processDataRecords(final ChannelHandlerContext ctx, final 
DataRecordResult result) {
@@ -81,15 +111,8 @@ public final class CDCRequestHandler extends 
ChannelInboundHandlerAdapter {
         
ctx.channel().writeAndFlush(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setAckStreamingRequestBody(AckStreamingRequestBody.newBuilder().setAckId(result.getAckId()).build()).build());
     }
     
-    @Override
-    public void channelInactive(final ChannelHandlerContext ctx) {
-        log.info("Request handler channel inactive");
-        ctx.fireChannelInactive();
-    }
-    
     @Override
     public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
-        log.error("handler data streaming error", cause);
-        // TODO passing error messages to the caller
+        exceptionHandler.handleSocketException(cause);
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java
similarity index 62%
copy from 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
copy to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java
index f644e5f59ff..9664809f472 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/constant/ClientConnectionStatus.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/ExceptionHandler.java
@@ -15,18 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.constant;
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
+
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
 
 /**
- * Client connection status.
+ * Exception handler.
  */
-public enum ClientConnectionStatus {
-    
-    CONNECTED,
-    
-    NOT_LOGGED_IN,
+public interface ExceptionHandler {
     
-    LOGGING_IN,
+    /**
+     * Handle server exception.
+     *
+     * @param result error result
+     */
+    void handleServerException(ServerErrorResult result);
     
-    STREAMING
+    /**
+     * Handle socket exception.
+     *
+     * @param throwable throwable
+     */
+    void handleSocketException(Throwable throwable);
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java
similarity index 58%
copy from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
copy to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java
index d2a0341e9c1..cf881ff26d8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoggerExceptionHandler.java
@@ -15,22 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.exception;
+package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
 
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.ShardingSphereSQLException;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
 
 /**
- * CDC exception wrapper.
+ * Logger exception handler, just print error.
  */
 @RequiredArgsConstructor
-@Getter
-public final class CDCExceptionWrapper extends RuntimeException {
+@Slf4j
+public class LoggerExceptionHandler implements ExceptionHandler {
     
-    private static final long serialVersionUID = -929604775277797727L;
+    @Override
+    public void handleServerException(final ServerErrorResult result) {
+        log.error("Server error, code: {}, message: {}", 
result.getErrorCode(), result.getErrorMessage());
+    }
     
-    private final String requestId;
-    
-    private final ShardingSphereSQLException exception;
+    @Override
+    public void handleSocketException(final Throwable throwable) {
+        log.error("Socket error: ", throwable);
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
deleted file mode 100644
index 24e4fd8fffe..00000000000
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/LoginRequestHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
-
-import com.google.common.hash.Hashing;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.event.StreamDataEvent;
-import org.apache.shardingsphere.data.pipeline.cdc.client.util.RequestIdUtils;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.LoginType;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
-
-/**
- * Login request handler.
- */
-@Slf4j
-@RequiredArgsConstructor
-public final class LoginRequestHandler extends ChannelInboundHandlerAdapter {
-    
-    private final String username;
-    
-    private final String password;
-    
-    @Override
-    public void channelActive(final ChannelHandlerContext ctx) {
-        ClientConnectionContext context = new ClientConnectionContext();
-        context.setStatus(ClientConnectionStatus.CONNECTED);
-        
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(context);
-    }
-    
-    @Override
-    public void channelInactive(final ChannelHandlerContext ctx) {
-        
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).setIfAbsent(null);
-        ctx.fireChannelInactive();
-    }
-    
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
-        CDCResponse response = (CDCResponse) msg;
-        ClientConnectionContext connectionContext = 
ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
-        switch (connectionContext.getStatus()) {
-            case CONNECTED:
-                setLoginRequest(ctx, response, connectionContext);
-                break;
-            case NOT_LOGGED_IN:
-                sendStreamDataEvent(ctx, response, connectionContext);
-                break;
-            default:
-                ctx.fireChannelRead(msg);
-                break;
-        }
-    }
-    
-    private void setLoginRequest(final ChannelHandlerContext ctx, final 
CDCResponse response, final ClientConnectionContext connectionContext) {
-        ServerGreetingResult serverGreetingResult = 
response.getServerGreetingResult();
-        log.info("Server greeting result, server version: {}, protocol 
version: {}", serverGreetingResult.getServerVersion(), 
serverGreetingResult.getProtocolVersion());
-        String encryptPassword = 
Hashing.sha256().hashBytes(password.getBytes()).toString().toUpperCase();
-        LoginRequestBody loginRequestBody = 
LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(username).setPassword(encryptPassword).build())
-                .build();
-        String loginRequestId = RequestIdUtils.generateRequestId();
-        CDCRequest data = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(loginRequestId).setLoginRequestBody(loginRequestBody).build();
-        ctx.writeAndFlush(data);
-        connectionContext.setStatus(ClientConnectionStatus.NOT_LOGGED_IN);
-    }
-    
-    private void sendStreamDataEvent(final ChannelHandlerContext ctx, final 
CDCResponse response, final ClientConnectionContext connectionContext) {
-        if (response.getStatus() == Status.SUCCEED) {
-            log.info("Login success, username {}", username);
-            connectionContext.setStatus(ClientConnectionStatus.LOGGING_IN);
-            ctx.fireUserEventTriggered(new StreamDataEvent());
-        } else {
-            log.error("Login failed, username: {}, error message: {}", 
username, response.getErrorMessage());
-        }
-    }
-    
-    @Override
-    public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
-        log.error("login handler error", cause);
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/CDCLoginParameter.java
similarity index 72%
rename from 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
rename to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/CDCLoginParameter.java
index ff0a4198da8..c6ae9368840 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/event/StreamDataEvent.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/CDCLoginParameter.java
@@ -15,10 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.client.event;
+package org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 
 /**
- * Stream data event.
+ * CDC login parameter.
  */
-public final class StreamDataEvent {
+@RequiredArgsConstructor
+@Getter
+public final class CDCLoginParameter {
+    
+    private final String username;
+    
+    private final String password;
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
similarity index 75%
rename from 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
rename to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
index f7148d4880b..1a7abd78a3f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartCDCClientParameter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
@@ -19,30 +19,20 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 
-import java.util.List;
+import java.util.Set;
 
 /**
- * Start CDC client parameter.
+ * Start streaming parameter.
  */
-@Getter
-@Setter
 @RequiredArgsConstructor
-public final class StartCDCClientParameter {
-    
-    private String address;
-    
-    private int port;
-    
-    private String username;
-    
-    private String password;
+@Getter
+public final class StartStreamingParameter {
     
-    private String database;
+    private final String database;
     
-    private List<SchemaTable> schemaTables;
+    private final Set<SchemaTable> schemaTables;
     
-    private boolean full;
+    private final boolean full;
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ResponseFuture.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ResponseFuture.java
new file mode 100644
index 00000000000..32e0afcc6bd
--- /dev/null
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ResponseFuture.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
+
+import com.google.common.base.Strings;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.exception.GetResultTimeoutException;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.exception.ServerResultException;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Response future.
+ */
+@RequiredArgsConstructor
+@Getter
+@Setter
+public final class ResponseFuture {
+    
+    @Getter(AccessLevel.PRIVATE)
+    private final CountDownLatch countDownLatch = new CountDownLatch(1);
+    
+    private final String requestId;
+    
+    private final Type requestType;
+    
+    private Status status;
+    
+    private String errorCode;
+    
+    private String errorMessage;
+    
+    private Object result;
+    
+    /**
+     * Wait response result.
+     *
+     * @param timeoutMillis timeout milliseconds
+     * @param connectionContext connection context
+     * @return response result
+     * @throws GetResultTimeoutException get result timeout
+     * @throws ServerResultException server result exception
+     */
+    @SneakyThrows(InterruptedException.class)
+    public Object waitResponseResult(final long timeoutMillis, final 
ClientConnectionContext connectionContext) {
+        boolean received = countDownLatch.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
+        connectionContext.getResponseFutureMap().remove(requestId);
+        if (!Strings.isNullOrEmpty(errorMessage)) {
+            throw new ServerResultException(String.format("Get %s response 
failed, code:%s, reason: %s", requestType.name(), errorCode, errorMessage));
+        }
+        if (!received) {
+            throw new GetResultTimeoutException("Get result timeout");
+        }
+        return result;
+    }
+    
+    /**
+     * Count down.
+     */
+    public void countDown() {
+        countDownLatch.countDown();
+    }
+}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ServerErrorResult.java
similarity index 69%
copy from 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
copy to 
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ServerErrorResult.java
index d2a0341e9c1..d4f952ef004 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/util/ServerErrorResult.java
@@ -15,22 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.cdc.exception;
+package org.apache.shardingsphere.data.pipeline.cdc.client.util;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.ShardingSphereSQLException;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
 
 /**
- * CDC exception wrapper.
+ * Server error result.
  */
 @RequiredArgsConstructor
 @Getter
-public final class CDCExceptionWrapper extends RuntimeException {
+public final class ServerErrorResult {
     
-    private static final long serialVersionUID = -929604775277797727L;
+    private final String errorCode;
     
-    private final String requestId;
+    private final String errorMessage;
     
-    private final ShardingSphereSQLException exception;
+    private final Type requestType;
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
index 3e371a5aa4b..ed67592a432 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java
@@ -17,9 +17,13 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.client.example;
 
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 
 import java.util.Collections;
@@ -32,19 +36,17 @@ public final class Bootstrap {
      *
      * @param args args
      */
+    @SneakyThrows(InterruptedException.class)
     public static void main(final String[] args) {
         // Pay attention to the time zone, to avoid the problem of incorrect 
time zone, it is best to ensure that the time zone of the program is consistent 
with the time zone of the database server
-        // and mysql-connector-java 5.x version will ignore serverTimezone 
jdbc parameter and use the default time zone in the program
         // TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        StartCDCClientParameter parameter = new StartCDCClientParameter();
-        parameter.setAddress("127.0.0.1");
-        parameter.setPort(33071);
-        parameter.setUsername("root");
-        parameter.setPassword("root");
-        parameter.setDatabase("sharding_db");
-        parameter.setFull(true);
-        
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
-        CDCClient cdcClient = new CDCClient(parameter, records -> 
log.info("records: {}", records));
-        cdcClient.start();
+        String address = "127.0.0.1";
+        CDCClientConfiguration clientConfig = new 
CDCClientConfiguration(address, 33071, records -> log.info("records: {}", 
records), new LoggerExceptionHandler());
+        try (CDCClient cdcClient = new CDCClient(clientConfig)) {
+            cdcClient.login(new CDCLoginParameter("root", "root"));
+            String streamingId = cdcClient.startStreaming(new 
StartStreamingParameter("sharding_db", 
Collections.singleton(SchemaTable.newBuilder().setTable("t_order").build()), 
true));
+            log.info("Streaming id={}", streamingId);
+            cdcClient.await();
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index 4c08d89209b..160399ce245 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -124,10 +124,6 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
         return sourceMetaDataLoaderLazyInitializer.get();
     }
     
-    public PipelineSink getSink() {
-        return sink;
-    }
-    
     @Override
     public long getProcessedRecordsCount() {
         return processedRecordsCount.get();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
index 0db35459a89..e9f3257733d 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/sink/CDCSocketSink.java
@@ -115,6 +115,5 @@ public final class CDCSocketSink implements PipelineSink {
     
     @Override
     public void close() throws IOException {
-        channel.close();
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
index d2a0341e9c1..ce07aed48a3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCExceptionWrapper.java
@@ -18,13 +18,11 @@
 package org.apache.shardingsphere.data.pipeline.cdc.exception;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.ShardingSphereSQLException;
 
 /**
  * CDC exception wrapper.
  */
-@RequiredArgsConstructor
 @Getter
 public final class CDCExceptionWrapper extends RuntimeException {
     
@@ -33,4 +31,10 @@ public final class CDCExceptionWrapper extends 
RuntimeException {
     private final String requestId;
     
     private final ShardingSphereSQLException exception;
+    
+    public CDCExceptionWrapper(final String requestId, final 
ShardingSphereSQLException exception) {
+        super(exception.getMessage());
+        this.requestId = requestId;
+        this.exception = exception;
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
index 1c4f0f73b8c..733659f3f0d 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/generator/CDCResponseUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.generator;
 
+import com.google.protobuf.Empty;
 import com.google.protobuf.Message;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -42,7 +43,7 @@ public final class CDCResponseUtils {
      * @return CDC response
      */
     public static CDCResponse succeed(final String requestId) {
-        return succeed(requestId, ResponseCase.RESPONSE_NOT_SET, null);
+        return succeed(requestId, ResponseCase.RESPONSE_NOT_SET, 
Empty.newBuilder().build());
     }
     
     /**
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index d842029d140..f9f6063f5a8 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -39,6 +39,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreami
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
@@ -68,15 +69,15 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     
     @Override
     public void channelActive(final ChannelHandlerContext ctx) {
-        CDCResponse response = 
CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1")
-                .build()).build();
+        CDCResponse response = 
CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
+                .setStatus(Status.SUCCEED).build();
         ctx.writeAndFlush(response);
     }
     
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) {
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
-        if (null != connectionContext.getJobId()) {
+        if (null != connectionContext && null != connectionContext.getJobId()) 
{
             backendHandler.stopStreaming(connectionContext.getJobId(), 
ctx.channel().id());
         }
         ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
@@ -104,7 +105,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         CDCRequest request = (CDCRequest) msg;
-        if (null == connectionContext) {
+        if (null == connectionContext || request.hasLoginRequestBody()) {
             processLogin(ctx, request);
             return;
         }
@@ -132,7 +133,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     
     private void processLogin(final ChannelHandlerContext ctx, final 
CDCRequest request) {
         if (!request.hasLoginRequestBody() || 
!request.getLoginRequestBody().hasBasicBody()) {
-            throw new CDCExceptionWrapper(request.getRequestId(), new 
PipelineInvalidParameterException("Login request body is empty"));
+            throw new CDCExceptionWrapper(request.getRequestId(), new 
CDCLoginException("Login request body is empty"));
         }
         BasicBody body = request.getLoginRequestBody().getBasicBody();
         AuthorityRule authorityRule = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
diff --git 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index ec866b09f05..c1024df5842 100644
--- 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++ 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -100,7 +100,7 @@ class CDCChannelInboundHandlerTest {
         assertTrue(expectedGreetingResult.hasServerGreetingResult());
         CDCResponse expectedLoginResult = channel.readOutbound();
         assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
-        assertThat(expectedLoginResult.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+        assertThat(expectedLoginResult.getErrorCode(), 
is(XOpenSQLState.GENERAL_ERROR.getValue()));
         assertFalse(channel.isOpen());
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 9fe9cc293fc..2176c6b9ade 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -27,7 +27,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
-import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.handler.LoggerExceptionHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -65,12 +68,10 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -94,8 +95,6 @@ class CDCE2EIT {
     
     private static final String SOURCE_TABLE_NAME = "t_order";
     
-    private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
-    
     @ParameterizedTest(name = "{0}")
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
@@ -174,22 +173,15 @@ class CDCE2EIT {
     
     private void startCDCClient(final PipelineContainerComposer 
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
         DataSource dataSource = createStandardDataSource(containerComposer, 
PipelineContainerComposer.DS_4);
-        StartCDCClientParameter parameter = new StartCDCClientParameter();
-        parameter.setAddress("localhost");
-        
parameter.setPort(containerComposer.getContainerComposer().getProxyCDCPort());
-        parameter.setUsername(ProxyContainerConstants.USERNAME);
-        parameter.setPassword(ProxyContainerConstants.PASSWORD);
-        parameter.setDatabase("sharding_db");
-        // TODO add full=false test case later
-        parameter.setFull(true);
-        String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : 
"";
-        
parameter.setSchemaTables(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
 SchemaTable.newBuilder().setTable("t_address").build()));
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
-        CompletableFuture.runAsync(() -> new CDCClient(parameter, 
recordConsumer).start(), executor).whenComplete((unused, throwable) -> {
-            if (null != throwable) {
-                log.error("cdc client sync failed", throwable);
-            }
-        });
+        CDCClientConfiguration cdcConfig = new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new 
LoggerExceptionHandler());
+        String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : 
"";
+        CDCClient cdcClient = new CDCClient(cdcConfig);
+        Awaitility.await().atMost(5L, TimeUnit.SECONDS).pollInterval(100L, 
TimeUnit.MILLISECONDS).until(cdcClient::isActive);
+        cdcClient.login(new 
CDCLoginParameter(ProxyContainerConstants.USERNAME, 
ProxyContainerConstants.PASSWORD));
+        // TODO add full=false test case later
+        cdcClient.startStreaming(new StartStreamingParameter("sharding_db", 
new 
HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
+                SchemaTable.newBuilder().setTable("t_address").build())), 
true));
     }
     
     private List<Map<String, Object>> listOrderRecords(final 
PipelineContainerComposer containerComposer, final String tableNameWithSchema) 
throws SQLException {

Reply via email to