This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ef02d52e6b Add more test cases on CDCChannelInboundHandlerTest 
(#37887)
3ef02d52e6b is described below

commit 3ef02d52e6b18f118fa6ac46f58d8dd17789d427
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 29 17:50:47 2026 +0800

    Add more test cases on CDCChannelInboundHandlerTest (#37887)
---
 .../netty/CDCChannelInboundHandlerTest.java        | 426 +++++++++++++++++++--
 1 file changed, 390 insertions(+), 36 deletions(-)

diff --git 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 1400e12a40d..40264616092 100644
--- 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++ 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -18,104 +18,458 @@
 package org.apache.shardingsphere.proxy.frontend.netty;
 
 import com.google.common.hash.Hashing;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
+import 
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
+import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
+import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Builder;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
+import 
org.apache.shardingsphere.database.exception.core.SQLExceptionTransformEngine;
+import 
org.apache.shardingsphere.database.exception.core.exception.connection.AccessDeniedException;
+import 
org.apache.shardingsphere.database.exception.core.exception.syntax.database.UnknownDatabaseException;
+import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
+import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
 import 
org.apache.shardingsphere.infra.exception.external.sql.sqlstate.XOpenSQLState;
+import 
org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
+import 
org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException;
 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.frontend.protocol.FrontDatabaseProtocolTypeFactory;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.AutoMockExtension;
 import 
org.apache.shardingsphere.test.infra.framework.extension.mock.StaticMockSettings;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(AutoMockExtension.class)
-@StaticMockSettings(ProxyContext.class)
+@StaticMockSettings({ProxyContext.class, 
FrontDatabaseProtocolTypeFactory.class})
 @MockitoSettings(strictness = Strictness.LENIENT)
 class CDCChannelInboundHandlerTest {
     
-    private final EmbeddedChannel channel = new EmbeddedChannel(new 
LoggingHandler(), new CDCChannelInboundHandler());
+    private static final AttributeKey<CDCConnectionContext> 
CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
     
+    private final CDCChannelInboundHandler handler = new 
CDCChannelInboundHandler();
+    
+    private final EmbeddedChannel channel = new EmbeddedChannel(new 
LoggingHandler(), handler);
+    
+    private final ShardingSphereUser user = new ShardingSphereUser("root", 
"root", "%");
+    
+    @Mock
+    private CDCBackendHandler backendHandler;
+    
+    @Mock
+    private AuthorityRule authorityRule;
+    
+    @Mock
+    private ShardingSpherePrivileges privileges;
+    
+    @SneakyThrows(ReflectiveOperationException.class)
     @BeforeEach
     void setup() {
-        ContextManager contextManager = mockContextManager();
-        
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
+        
Plugins.getMemberAccessor().set(CDCChannelInboundHandler.class.getDeclaredField("backendHandler"),
 handler, backendHandler);
+        when(authorityRule.findUser(any())).thenReturn(Optional.of(user));
+        
when(authorityRule.findPrivileges(any())).thenReturn(Optional.of(privileges));
+        when(privileges.hasPrivileges(any())).thenReturn(true);
+        
when(FrontDatabaseProtocolTypeFactory.getDatabaseType()).thenReturn(mock(DatabaseType.class));
+        mockProxyContext(new 
RuleMetaData(Collections.singleton(authorityRule)));
     }
     
-    private ContextManager mockContextManager() {
-        ContextManager result = mock(ContextManager.class, RETURNS_DEEP_STUBS);
-        RuleMetaData globalRuleMetaData = new 
RuleMetaData(Collections.singleton(mockAuthorityRule()));
-        
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
-        return result;
+    private void mockProxyContext(final RuleMetaData ruleMetaData) {
+        ProxyContext proxyContext = mock(ProxyContext.class);
+        ContextManager contextManager = mock(ContextManager.class, 
RETURNS_DEEP_STUBS);
+        
when(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(ruleMetaData);
+        ConfigurationProperties props = mock(ConfigurationProperties.class);
+        
when(props.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE)).thenReturn(mock(DatabaseType.class));
+        
when(contextManager.getMetaDataContexts().getMetaData().getProps()).thenReturn(props);
+        when(ProxyContext.getInstance()).thenReturn(proxyContext);
+        when(proxyContext.getContextManager()).thenReturn(contextManager);
     }
     
-    private AuthorityRule mockAuthorityRule() {
-        AuthorityRule result = mock(AuthorityRule.class);
-        when(result.findUser(any())).thenReturn(Optional.of(new 
ShardingSphereUser("root", "root", "%")));
-        return result;
+    @Test
+    void assertChannelInactiveStopsStreaming() {
+        CDCConnectionContext connectionContext = new 
CDCConnectionContext(user);
+        connectionContext.setJobId("job-id");
+        channel.attr(CONNECTION_CONTEXT_KEY).set(connectionContext);
+        channel.pipeline().fireChannelInactive();
+        verify(backendHandler).stopStreaming("job-id", channel.id());
+        assertNull(channel.attr(CONNECTION_CONTEXT_KEY).get());
+    }
+    
+    @Test
+    void assertChannelInactiveWithConnectionContextWithoutJob() {
+        Attribute<CDCConnectionContext> attribute = mock(Attribute.class);
+        when(attribute.get()).thenReturn(new CDCConnectionContext(user));
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
+        
when(context.channel().attr(CONNECTION_CONTEXT_KEY)).thenReturn(attribute);
+        handler.channelInactive(context);
+        verify(backendHandler, never()).stopStreaming(anyString(), any());
+        verify(attribute).set(null);
+    }
+    
+    @Test
+    void assertExceptionCaughtWithWrapperClosesChannel() {
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
+        ChannelFuture channelFuture = mock(ChannelFuture.class);
+        when(context.writeAndFlush(any())).thenReturn(channelFuture);
+        handler.exceptionCaught(context, new CDCExceptionWrapper("request-id", 
new PipelineInvalidParameterException("invalid")));
+        verify(context).writeAndFlush(argThat(argument -> 
"request-id".equals(((CDCResponse) argument).getRequestId())));
+        verify(channelFuture).addListener(ChannelFutureListener.CLOSE);
+    }
+    
+    @Test
+    void assertExceptionCaughtWithNonWrapperException() {
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
+        ChannelFuture channelFuture = mock(ChannelFuture.class);
+        
when(context.channel().attr(CONNECTION_CONTEXT_KEY).get()).thenReturn(new 
CDCConnectionContext(user));
+        when(context.writeAndFlush(any())).thenReturn(channelFuture);
+        handler.exceptionCaught(context, new RuntimeException("error"));
+        verify(channelFuture, 
never()).addListener(ChannelFutureListener.CLOSE);
     }
     
     @Test
     void assertLoginRequestFailed() {
-        CDCRequest actualRequest = 
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root2").build())
-                .build()).build();
-        channel.writeInbound(actualRequest);
-        CDCResponse expectedGreetingResult = channel.readOutbound();
-        assertTrue(expectedGreetingResult.hasServerGreetingResult());
-        CDCResponse expectedLoginResult = channel.readOutbound();
-        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
-        assertThat(expectedLoginResult.getErrorCode(), 
is(XOpenSQLState.DATA_SOURCE_REJECTED_CONNECTION_ATTEMPT.getValue()));
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder()
+                
.setBasicBody(BasicBody.newBuilder().setUsername("root2").build()).build()).build();
+        channel.writeInbound(request);
+        CDCResponse greeting = channel.readOutbound();
+        assertTrue(greeting.hasServerGreetingResult());
+        CDCResponse loginResult = channel.readOutbound();
+        assertThat(loginResult.getStatus(), is(Status.FAILED));
+        assertThat(loginResult.getErrorCode(), 
is(XOpenSQLState.DATA_SOURCE_REJECTED_CONNECTION_ATTEMPT.getValue()));
         assertFalse(channel.isOpen());
     }
     
     @Test
     void assertIllegalLoginRequest() {
-        CDCRequest actualRequest = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId("test").build();
-        channel.writeInbound(actualRequest);
-        CDCResponse expectedGreetingResult = channel.readOutbound();
-        assertTrue(expectedGreetingResult.hasServerGreetingResult());
-        CDCResponse expectedLoginResult = channel.readOutbound();
-        assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
-        assertThat(expectedLoginResult.getErrorCode(), 
is(XOpenSQLState.NOT_FOUND.getValue()));
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId("test").build();
+        channel.writeInbound(request);
+        CDCResponse greeting = channel.readOutbound();
+        assertTrue(greeting.hasServerGreetingResult());
+        CDCResponse loginResult = channel.readOutbound();
+        assertThat(loginResult.getStatus(), is(Status.FAILED));
+        assertThat(loginResult.getErrorCode(), 
is(XOpenSQLState.NOT_FOUND.getValue()));
         assertFalse(channel.isOpen());
     }
     
     @Test
     void assertLoginRequestSucceed() {
+        channel.connect(new InetSocketAddress("127.0.0.1", 3307));
         String encryptPassword = 
Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase();
         Builder builder = 
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(BasicBody.newBuilder().setUsername("root")
                 .setPassword(encryptPassword).build()).build());
-        CDCRequest actualRequest = builder.build();
-        channel.writeInbound(actualRequest);
-        CDCResponse expectedGreetingResult = channel.readOutbound();
-        assertTrue(expectedGreetingResult.hasServerGreetingResult());
-        CDCResponse expectedLoginResult = channel.readOutbound();
-        assertThat(expectedLoginResult.getStatus(), is(Status.SUCCEED));
-        assertThat(expectedLoginResult.getErrorCode(), is(""));
-        assertThat(expectedLoginResult.getErrorMessage(), is(""));
+        channel.writeInbound(builder.build());
+        CDCResponse greeting = channel.readOutbound();
+        assertTrue(greeting.hasServerGreetingResult());
+        CDCResponse loginResult = channel.readOutbound();
+        assertThat(loginResult.getStatus(), is(Status.SUCCEED));
+        assertThat(loginResult.getErrorCode(), is(""));
+        assertThat(loginResult.getErrorMessage(), is(""));
+    }
+    
+    @Test
+    void assertLoginRequestWithExistingConnectionContext() {
+        channel.attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user));
+        
channel.writeInbound(createLoginRequest(Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase()));
+        CDCResponse greeting = channel.readOutbound();
+        assertTrue(greeting.hasServerGreetingResult());
+        CDCResponse loginResult = channel.readOutbound();
+        assertThat(loginResult.getStatus(), is(Status.SUCCEED));
+        
assertThat(channel.attr(CONNECTION_CONTEXT_KEY).get().getCurrentUser().getGrantee().getUsername(),
 is("root"));
+    }
+    
+    @Test
+    void assertLoginWithNonInetSocketAddress() {
+        Channel channelMock = mock(Channel.class, RETURNS_DEEP_STUBS);
+        Attribute<CDCConnectionContext> attribute = mock(Attribute.class);
+        when(channelMock.attr(CONNECTION_CONTEXT_KEY)).thenReturn(attribute);
+        SocketAddress socketAddress = new SocketAddress() {
+            
+            private static final long serialVersionUID = -733394185197826745L;
+            
+            @Override
+            public String toString() {
+                return "local-address";
+            }
+        };
+        when(channelMock.remoteAddress()).thenReturn(socketAddress);
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
+        when(context.channel()).thenReturn(channelMock);
+        
when(context.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class));
+        handler.channelRead(context, 
createLoginRequest(Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase()));
+        verify(context).writeAndFlush(argThat(argument -> ((CDCResponse) 
argument).getStatus() == Status.SUCCEED));
+        verify(attribute).set(any(CDCConnectionContext.class));
+    }
+    
+    @Test
+    void assertLoginUsesInetSocketAddressHost() {
+        Channel channelMock = mock(Channel.class, RETURNS_DEEP_STUBS);
+        Attribute<CDCConnectionContext> attribute = mock(Attribute.class);
+        when(channelMock.attr(CONNECTION_CONTEXT_KEY)).thenReturn(attribute);
+        InetSocketAddress remoteAddress = new InetSocketAddress("127.0.0.2", 
3307);
+        when(channelMock.remoteAddress()).thenReturn(remoteAddress);
+        ChannelFuture channelFuture = mock(ChannelFuture.class);
+        ChannelHandlerContext context = mock(ChannelHandlerContext.class, 
RETURNS_DEEP_STUBS);
+        when(context.channel()).thenReturn(channelMock);
+        when(context.writeAndFlush(any())).thenReturn(channelFuture);
+        CDCRequest request = 
createLoginRequest(Hashing.sha256().hashBytes("root".getBytes()).toString().toUpperCase());
+        handler.channelRead(context, request);
+        ArgumentCaptor<Grantee> captor = 
ArgumentCaptor.forClass(Grantee.class);
+        verify(authorityRule, atLeastOnce()).findUser(captor.capture());
+        assertTrue(captor.getAllValues().stream().anyMatch(grantee -> 
"127.0.0.2".equals(grantee.getHostname())));
+    }
+    
+    @Test
+    void assertStreamDataRequestBodyMissing() {
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.STREAM_DATA).setRequestId("stream-request").build();
+        CDCResponse response = writeRequestWithContext(request);
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertStreamDataRequestDatabaseMissing() {
+        CDCRequest request = createStreamDataRequest("");
+        CDCResponse response = writeRequestWithContext(request);
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertStreamDataRequestSourceSchemaTableMissing() {
+        StreamDataRequestBody body = 
StreamDataRequestBody.newBuilder().setDatabase("logic_db").build();
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.STREAM_DATA).setRequestId("stream-request").setStreamDataRequestBody(body).build();
+        CDCResponse response = writeRequestWithContext(request);
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertStreamDataRequestMissingAuthorityRule() {
+        mockProxyContext(new RuleMetaData(Collections.emptyList()));
+        CDCRequest request = createStreamDataRequest("logic_db");
+        CDCResponse response = writeRequestWithContext(request);
+        SQLException expectedException = 
SQLExceptionTransformEngine.toSQLException(new 
MissingRequiredRuleException("authority"), 
FrontDatabaseProtocolTypeFactory.getDatabaseType());
+        assertThat(response.getErrorCode(), 
is(expectedException.getSQLState()));
+    }
+    
+    @Test
+    void assertStreamDataRequestPrivilegesNotFound() {
+        when(authorityRule.findPrivileges(any())).thenReturn(Optional.empty());
+        CDCRequest request = createStreamDataRequest("logic_db");
+        CDCResponse response = writeRequestWithContext(request);
+        Grantee grantee = user.getGrantee();
+        SQLException expectedException = 
SQLExceptionTransformEngine.toSQLException(new 
AccessDeniedException(grantee.getUsername(), grantee.getHostname(), false),
+                FrontDatabaseProtocolTypeFactory.getDatabaseType());
+        assertThat(response.getErrorCode(), 
is(expectedException.getSQLState()));
+    }
+    
+    @Test
+    void assertStreamDataRequestPrivilegesWithoutDatabasePermission() {
+        when(privileges.hasPrivileges("logic_db")).thenReturn(false);
+        CDCRequest request = createStreamDataRequest("logic_db");
+        CDCResponse response = writeRequestWithContext(request);
+        SQLException expectedException = 
SQLExceptionTransformEngine.toSQLException(new 
UnknownDatabaseException("logic_db"), 
FrontDatabaseProtocolTypeFactory.getDatabaseType());
+        assertThat(response.getErrorCode(), 
is(expectedException.getSQLState()));
+    }
+    
+    private CDCRequest createStreamDataRequest(final String database) {
+        StreamDataRequestBody.Builder bodyBuilder = 
StreamDataRequestBody.newBuilder().setDatabase(database);
+        
bodyBuilder.addSourceSchemaTable(StreamDataRequestBody.SchemaTable.newBuilder().setSchema("schema").setTable("table").build());
+        return 
CDCRequest.newBuilder().setType(Type.STREAM_DATA).setRequestId("stream-request").setStreamDataRequestBody(bodyBuilder.build()).build();
+    }
+    
+    @Test
+    void assertStreamDataRequestWrapsPipelineSQLException() {
+        CDCConnectionContext connectionContext = new 
CDCConnectionContext(user);
+        channel.attr(CONNECTION_CONTEXT_KEY).set(connectionContext);
+        StreamDataRequestBody.Builder bodyBuilder = 
StreamDataRequestBody.newBuilder().setDatabase("logic_db");
+        
bodyBuilder.addSourceSchemaTable(StreamDataRequestBody.SchemaTable.newBuilder().setSchema("schema").setTable("table").build());
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.STREAM_DATA).setRequestId("stream-request").setStreamDataRequestBody(bodyBuilder.build()).build();
+        when(backendHandler.streamData(any(), any(), any(), 
any())).thenThrow(mock(PipelineSQLException.class));
+        ChannelHandlerContext context = channel.pipeline().context(handler);
+        assertThrows(CDCExceptionWrapper.class, () -> 
handler.channelRead(context, request));
+        assertThat(channel.attr(CONNECTION_CONTEXT_KEY).get(), 
is(connectionContext));
+    }
+    
+    @Test
+    void assertStreamDataRequestSucceed() {
+        channel.attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user));
+        StreamDataRequestBody.Builder bodyBuilder = 
StreamDataRequestBody.newBuilder().setDatabase("logic_db");
+        
bodyBuilder.addSourceSchemaTable(StreamDataRequestBody.SchemaTable.newBuilder().setSchema("schema").setTable("table").build());
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.STREAM_DATA).setRequestId("stream-request").setStreamDataRequestBody(bodyBuilder.build()).build();
+        CDCResponse succeedResponse = 
CDCResponseUtils.succeed("stream-request");
+        when(backendHandler.streamData(any(), any(), any(), 
any())).thenReturn(succeedResponse);
+        channel.writeInbound(request);
+        CDCResponse response = readResponseSkippingGreeting();
+        assertThat(response.getStatus(), is(Status.SUCCEED));
+        assertThat(response.getRequestId(), is("stream-request"));
+    }
+    
+    @Test
+    void assertAckStreamingRequestBodyMissing() {
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setRequestId("ack-request").build();
+        CDCResponse response = writeRequestWithContext(request);
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertAckStreamingRequestAckIdMissing() {
+        AckStreamingRequestBody.Builder bodyBuilder = 
AckStreamingRequestBody.newBuilder();
+        bodyBuilder.setAckId("");
+        CDCResponse response = 
writeRequestWithContext(CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setRequestId("ack-request").setAckStreamingRequestBody(bodyBuilder.build()).build());
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertAckStreamingRequestSucceed() {
+        channel.attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user));
+        AckStreamingRequestBody.Builder bodyBuilder = 
AckStreamingRequestBody.newBuilder();
+        bodyBuilder.setAckId("ack-1");
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.ACK_STREAMING).setRequestId("ack-request").setAckStreamingRequestBody(bodyBuilder.build()).build();
+        channel.writeInbound(request);
+        channel.readOutbound();
+        
verify(backendHandler).processAck(request.getAckStreamingRequestBody());
+    }
+    
+    @Test
+    void assertStartStreamingRequestBodyMissing() {
+        CDCRequest request = 
CDCRequest.newBuilder().setType(Type.START_STREAMING).setRequestId("start-request").build();
+        CDCResponse response = writeRequestWithContext(request);
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertStartStreamingRequestIdMissing() {
+        StartStreamingRequestBody.Builder bodyBuilder = 
StartStreamingRequestBody.newBuilder();
+        bodyBuilder.setStreamingId("");
+        CDCResponse response = 
writeRequestWithContext(CDCRequest.newBuilder().setType(Type.START_STREAMING).setRequestId("start-request").setStartStreamingRequestBody(bodyBuilder.build()).build());
+        assertThat(response.getStatus(), is(Status.FAILED));
+        assertThat(response.getErrorCode(), 
is(XOpenSQLState.INVALID_PARAMETER_VALUE.getValue()));
+    }
+    
+    @Test
+    void assertStartStreamingRequestSucceed() {
+        CDCConnectionContext connectionContext = new 
CDCConnectionContext(user);
+        channel.attr(CONNECTION_CONTEXT_KEY).set(connectionContext);
+        
when(backendHandler.getDatabaseNameByJobId("job-1")).thenReturn("logic_db");
+        StartStreamingRequestBody.Builder bodyBuilder = 
StartStreamingRequestBody.newBuilder();
+        bodyBuilder.setStreamingId("job-1");
+        
channel.writeInbound(CDCRequest.newBuilder().setType(Type.START_STREAMING).setRequestId("start-request").setStartStreamingRequestBody(bodyBuilder.build()).build());
+        CDCResponse response = readResponseSkippingGreeting();
+        verify(backendHandler).startStreaming("job-1", connectionContext, 
channel);
+        assertThat(response.getStatus(), is(Status.SUCCEED));
+    }
+    
+    @Test
+    void assertStopStreamingRequestSucceed() {
+        CDCConnectionContext connectionContext = new 
CDCConnectionContext(user);
+        connectionContext.setJobId("job-1");
+        channel.attr(CONNECTION_CONTEXT_KEY).set(connectionContext);
+        
when(backendHandler.getDatabaseNameByJobId("job-1")).thenReturn("logic_db");
+        StopStreamingRequestBody.Builder bodyBuilder = 
StopStreamingRequestBody.newBuilder();
+        bodyBuilder.setStreamingId("job-1");
+        
channel.writeInbound(CDCRequest.newBuilder().setType(Type.STOP_STREAMING).setRequestId("stop-request").setStopStreamingRequestBody(bodyBuilder.build()).build());
+        CDCResponse response = readResponseSkippingGreeting();
+        verify(backendHandler).stopStreaming("job-1", channel.id());
+        assertNull(connectionContext.getJobId());
+        assertThat(response.getStatus(), is(Status.SUCCEED));
+    }
+    
+    @Test
+    void assertDropStreamingRequestSucceed() {
+        CDCConnectionContext connectionContext = new 
CDCConnectionContext(user);
+        connectionContext.setJobId("job-1");
+        channel.attr(CONNECTION_CONTEXT_KEY).set(connectionContext);
+        
when(backendHandler.getDatabaseNameByJobId("job-1")).thenReturn("logic_db");
+        DropStreamingRequestBody.Builder bodyBuilder = 
DropStreamingRequestBody.newBuilder();
+        bodyBuilder.setStreamingId("job-1");
+        
channel.writeInbound(CDCRequest.newBuilder().setType(Type.DROP_STREAMING).setRequestId("drop-request").setDropStreamingRequestBody(bodyBuilder.build()).build());
+        CDCResponse response = readResponseSkippingGreeting();
+        verify(backendHandler).dropStreaming("job-1");
+        assertNull(connectionContext.getJobId());
+        assertThat(response.getStatus(), is(Status.SUCCEED));
+    }
+    
+    @Test
+    void assertUnknownRequestTypeIgnored() {
+        channel.attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user));
+        
channel.writeInbound(CDCRequest.newBuilder().setType(Type.UNKNOWN).build());
+        assertNotNull(channel.readOutbound());
+        assertNull(channel.readOutbound());
+        assertTrue(channel.isOpen());
+    }
+    
+    private CDCRequest createLoginRequest(final String password) {
+        BasicBody.Builder bodyBuilder = 
BasicBody.newBuilder().setUsername("root");
+        bodyBuilder.setPassword(password);
+        return 
CDCRequest.newBuilder().setType(Type.LOGIN).setLoginRequestBody(LoginRequestBody.newBuilder().setBasicBody(bodyBuilder.build()).build()).build();
+    }
+    
+    private CDCResponse writeRequestWithContext(final CDCRequest request) {
+        channel.attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user));
+        channel.writeInbound(request);
+        return readResponseSkippingGreeting();
+    }
+    
+    private CDCResponse readResponseSkippingGreeting() {
+        channel.readOutbound();
+        return channel.readOutbound();
     }
 }

Reply via email to