This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c729419173 Add privileges check at CDC (#23627)
0c729419173 is described below

commit 0c72941917377f9b3efd90a74ce2331cf80ce5a4
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Jan 27 20:53:09 2023 +0800

    Add privileges check at CDC (#23627)
    
    * Add privileges check at CDC
    
    * tmp
    
    * Fix ci
---
 .../pipeline/cdc/context/CDCConnectionContext.java |  6 +-
 .../pipeline/cdc/common/CDCResponseErrorCode.java  |  4 +-
 .../frontend/netty/CDCChannelInboundHandler.java   | 72 +++++++++++++++++-----
 .../netty/CDCChannelInboundHandlerTest.java        | 23 ++-----
 4 files changed, 68 insertions(+), 37 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
index e73199c02e1..8abb4a70f73 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -20,16 +20,18 @@ package org.apache.shardingsphere.data.pipeline.cdc.context;
 import lombok.Getter;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
 
 /**
  * CDC connection context.
  */
 @Getter
+@Setter
 public final class CDCConnectionContext {
     
-    @Setter
     private volatile CDCConnectionStatus status;
     
-    @Setter
     private volatile String jobId;
+    
+    private volatile ShardingSphereUser currentUser;
 }
diff --git 
a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
 
b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
index 45d57900720..2498f351d90 100644
--- 
a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++ 
b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
@@ -28,7 +28,9 @@ public enum CDCResponseErrorCode {
     
     SERVER_ERROR("1"),
     
-    ILLEGAL_REQUEST_ERROR("2");
+    ILLEGAL_REQUEST_ERROR("2"),
+    
+    ILLEGAL_USERNAME_OR_PASSWORD("3");
     
     @Getter
     private final String code;
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 965e0ea75cf..73b380630f5 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -18,11 +18,13 @@
 package org.apache.shardingsphere.proxy.frontend.netty;
 
 import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.AttributeKey;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
 import org.apache.shardingsphere.authority.rule.AuthorityRule;
 import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
 import 
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
@@ -32,21 +34,25 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopSubscriptionRequest;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import 
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
 import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import 
org.apache.shardingsphere.infra.executor.check.exception.SQLCheckException;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -79,27 +85,45 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
     }
     
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
+        log.error("caught CDC resolution error", cause);
+        // TODO add CDC exception to wrapper this exception, and add the 
parameters requestId and whether to close connect
+        CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+        ChannelFuture channelFuture;
+        if (cause instanceof ShardingSphereSQLException) {
+            SQLException sqlException = ((ShardingSphereSQLException) 
cause).toSQLException();
+            String errorMessage = String.format("ERROR %s (%s): %s", 
sqlException.getErrorCode(), sqlException.getSQLState(), 
sqlException.getMessage());
+            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", 
CDCResponseErrorCode.SERVER_ERROR, errorMessage));
+        } else {
+            channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("", 
CDCResponseErrorCode.SERVER_ERROR, cause.getMessage()));
+        }
+        if (CDCConnectionStatus.NOT_LOGGED_IN == 
connectionContext.getStatus()) {
+            channelFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+    
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         CDCConnectionStatus status = connectionContext.getStatus();
         CDCRequest request = (CDCRequest) msg;
         if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
-            processLogin(ctx, request);
+            processLogin(ctx, request, connectionContext);
             return;
         }
         switch (request.getRequestCase()) {
             case CREATE_SUBSCRIPTION:
-                processCreateSubscription(ctx, request);
+                processCreateSubscription(ctx, request, connectionContext);
                 break;
             case START_SUBSCRIPTION:
                 processStartSubscription(ctx, request, connectionContext);
                 break;
             case STOP_SUBSCRIPTION:
-                stopStartSubscription(ctx, request, connectionContext);
+                processStopSubscription(ctx, request, connectionContext);
                 break;
             case DROP_SUBSCRIPTION:
-                dropStartSubscription(ctx, request, connectionContext);
+                processDropSubscription(ctx, request, connectionContext);
                 break;
             case ACK_REQUEST:
                 processAckRequest(ctx, request);
@@ -109,25 +133,34 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         }
     }
     
-    private void processLogin(final ChannelHandlerContext ctx, final 
CDCRequest request) {
+    private void processLogin(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
             
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request 
body")).addListener(ChannelFutureListener.CLOSE);
             return;
         }
         BasicBody body = request.getLogin().getBasicBody();
-        Collection<ShardingSphereRule> globalRules = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules();
-        Optional<AuthorityRule> authorityRule = 
globalRules.stream().filter(rule -> rule instanceof AuthorityRule).map(rule -> 
(AuthorityRule) rule).findFirst();
+        Optional<AuthorityRule> authorityRule = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class);
         if (!authorityRule.isPresent()) {
             
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, "Not find authority 
rule")).addListener(ChannelFutureListener.CLOSE);
             return;
         }
         Optional<ShardingSphereUser> user = authorityRule.get().findUser(new 
Grantee(body.getUsername(), getHostAddress(ctx)));
         if (user.isPresent() && 
Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(),
 body.getPassword())) {
-            
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get().setStatus(CDCConnectionStatus.LOGGED_IN);
+            connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+            connectionContext.setCurrentUser(user.get());
             
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
-            return;
+        } else {
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD, "Illegal username or 
password"))
+                    .addListener(ChannelFutureListener.CLOSE);
         }
-        ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.SERVER_ERROR, "Incorrect username or 
password")).addListener(ChannelFutureListener.CLOSE);
+    }
+    
+    private void checkPrivileges(final Grantee grantee, final String 
currentDatabase) {
+        AuthorityRule authorityRule = 
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
+                .orElseThrow(() -> new 
MissingRequiredRuleException("authority"));
+        ShardingSpherePrivileges privileges = 
authorityRule.findPrivileges(grantee)
+                .orElseThrow(() -> new SQLCheckException(String.format("Access 
denied for user '%s'@'%s'", grantee.getUsername(), grantee.getHostname())));
+        
ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
 () -> new SQLCheckException(String.format("Unknown database '%s'", 
currentDatabase)));
     }
     
     private String getHostAddress(final ChannelHandlerContext context) {
@@ -135,7 +168,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         return socketAddress instanceof InetSocketAddress ? 
((InetSocketAddress) socketAddress).getAddress().getHostAddress() : 
socketAddress.toString();
     }
     
-    private void processCreateSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request) {
+    private void processCreateSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
         if (!request.hasCreateSubscription()) {
             
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request 
body"))
                     .addListener(ChannelFutureListener.CLOSE);
@@ -144,10 +177,10 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
         CreateSubscriptionRequest createSubscriptionRequest = 
request.getCreateSubscription();
         if (createSubscriptionRequest.getTableNamesList().isEmpty() || 
createSubscriptionRequest.getDatabase().isEmpty() || 
createSubscriptionRequest.getSubscriptionName().isEmpty()
                 || createSubscriptionRequest.getSubscriptionMode() == 
SubscriptionMode.UNKNOWN) {
-            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription 
request parameter"))
-                    .addListener(ChannelFutureListener.CLOSE);
+            
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(), 
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription 
request parameter"));
             return;
         }
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
createSubscriptionRequest.getDatabase());
         CDCResponse response = backendHandler.createSubscription(request);
         ctx.writeAndFlush(response);
     }
@@ -164,17 +197,22 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
                     .addListener(ChannelFutureListener.CLOSE);
             return;
         }
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
startSubscriptionRequest.getDatabase());
         CDCResponse response = backendHandler.startSubscription(request, 
ctx.channel(), connectionContext);
         ctx.writeAndFlush(response);
     }
     
-    private void stopStartSubscription(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
+    private void processStopSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
+        StopSubscriptionRequest stopSubscriptionRequest = 
request.getStopSubscription();
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
stopSubscriptionRequest.getDatabase());
         backendHandler.stopSubscription(connectionContext.getJobId());
         connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
         
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
     }
     
-    private void dropStartSubscription(final ChannelHandlerContext ctx, final 
CDCRequest request, final CDCConnectionContext connectionContext) {
+    private void processDropSubscription(final ChannelHandlerContext ctx, 
final CDCRequest request, final CDCConnectionContext connectionContext) {
+        DropSubscriptionRequest dropSubscriptionRequest = 
request.getDropSubscription();
+        checkPrivileges(connectionContext.getCurrentUser().getGrantee(), 
dropSubscriptionRequest.getDatabase());
         try {
             backendHandler.dropSubscription(connectionContext.getJobId());
             
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
diff --git 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 845a270356f..3942246d752 100644
--- 
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++ 
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -29,9 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
 import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -40,7 +38,6 @@ import org.junit.Test;
 import org.mockito.MockedStatic;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -66,19 +63,11 @@ public final class CDCChannelInboundHandlerTest {
         proxyContext = mockStatic(ProxyContext.class);
         ProxyContext mockedProxyContext = mock(ProxyContext.class, 
RETURNS_DEEP_STUBS);
         
proxyContext.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
-        ShardingSphereRuleMetaData globalRuleMetaData = 
mock(ShardingSphereRuleMetaData.class);
-        
when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
-        List<ShardingSphereRule> rules = 
Collections.singletonList(mockAuthRule());
-        when(globalRuleMetaData.getRules()).thenReturn(rules);
-    }
-    
-    private static AuthorityRule mockAuthRule() {
-        AuthorityRule result = mock(AuthorityRule.class);
-        ShardingSphereUser mockUser = mock(ShardingSphereUser.class);
-        when(mockUser.getGrantee()).thenReturn(new Grantee("root", "%"));
-        when(mockUser.getPassword()).thenReturn("root");
-        when(result.findUser(any())).thenReturn(Optional.of(mockUser));
-        return result;
+        AuthorityRule authorityRule = mock(AuthorityRule.class);
+        ShardingSphereUser rootUser = new ShardingSphereUser("root", "root", 
"%");
+        when(authorityRule.findUser(any())).thenReturn(Optional.of(rootUser));
+        ShardingSphereRuleMetaData shardingSphereRuleMetaData = new 
ShardingSphereRuleMetaData(Collections.singletonList(authorityRule));
+        
when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(shardingSphereRuleMetaData);
     }
     
     @AfterClass
@@ -99,7 +88,7 @@ public final class CDCChannelInboundHandlerTest {
         assertTrue(expectedGreetingResult.hasServerGreetingResult());
         CDCResponse expectedLoginResult = channel.readOutbound();
         assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
-        assertThat(expectedLoginResult.getErrorCode(), 
is(CDCResponseErrorCode.SERVER_ERROR.getCode()));
+        assertThat(expectedLoginResult.getErrorCode(), 
is(CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD.getCode()));
         assertFalse(channel.isOpen());
     }
     

Reply via email to