This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0c729419173 Add privileges check at CDC (#23627)
0c729419173 is described below
commit 0c72941917377f9b3efd90a74ce2331cf80ce5a4
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Jan 27 20:53:09 2023 +0800
Add privileges check at CDC (#23627)
* Add privileges check at CDC
* tmp
* Fix ci
---
.../pipeline/cdc/context/CDCConnectionContext.java | 6 +-
.../pipeline/cdc/common/CDCResponseErrorCode.java | 4 +-
.../frontend/netty/CDCChannelInboundHandler.java | 72 +++++++++++++++++-----
.../netty/CDCChannelInboundHandlerTest.java | 23 ++-----
4 files changed, 68 insertions(+), 37 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
index e73199c02e1..8abb4a70f73 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCConnectionContext.java
@@ -20,16 +20,18 @@ package org.apache.shardingsphere.data.pipeline.cdc.context;
import lombok.Getter;
import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
/**
* CDC connection context.
*/
@Getter
+@Setter
public final class CDCConnectionContext {
- @Setter
private volatile CDCConnectionStatus status;
- @Setter
private volatile String jobId;
+
+ private volatile ShardingSphereUser currentUser;
}
diff --git
a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
index 45d57900720..2498f351d90 100644
---
a/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
+++
b/kernel/data-pipeline/cdc/protocol/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/common/CDCResponseErrorCode.java
@@ -28,7 +28,9 @@ public enum CDCResponseErrorCode {
SERVER_ERROR("1"),
- ILLEGAL_REQUEST_ERROR("2");
+ ILLEGAL_REQUEST_ERROR("2"),
+
+ ILLEGAL_USERNAME_OR_PASSWORD("3");
@Getter
private final String code;
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 965e0ea75cf..73b380630f5 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -18,11 +18,13 @@
package org.apache.shardingsphere.proxy.frontend.netty;
import com.google.common.hash.Hashing;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
import org.apache.shardingsphere.data.pipeline.cdc.common.CDCResponseErrorCode;
import
org.apache.shardingsphere.data.pipeline.cdc.constant.CDCConnectionStatus;
@@ -32,21 +34,25 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckRequest;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest.BasicBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartSubscriptionRequest;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopSubscriptionRequest;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
+import
org.apache.shardingsphere.distsql.handler.exception.rule.MissingRequiredRuleException;
import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
+import
org.apache.shardingsphere.infra.executor.check.exception.SQLCheckException;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
+import
org.apache.shardingsphere.infra.util.exception.external.sql.ShardingSphereSQLException;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.handler.cdc.CDCBackendHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
@@ -79,27 +85,45 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
}
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) {
+ log.error("caught CDC resolution error", cause);
+ // TODO add CDC exception to wrapper this exception, and add the
parameters requestId and whether to close connect
+ CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
+ ChannelFuture channelFuture;
+ if (cause instanceof ShardingSphereSQLException) {
+ SQLException sqlException = ((ShardingSphereSQLException)
cause).toSQLException();
+ String errorMessage = String.format("ERROR %s (%s): %s",
sqlException.getErrorCode(), sqlException.getSQLState(),
sqlException.getMessage());
+ channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("",
CDCResponseErrorCode.SERVER_ERROR, errorMessage));
+ } else {
+ channelFuture = ctx.writeAndFlush(CDCResponseGenerator.failed("",
CDCResponseErrorCode.SERVER_ERROR, cause.getMessage()));
+ }
+ if (CDCConnectionStatus.NOT_LOGGED_IN ==
connectionContext.getStatus()) {
+ channelFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
CDCConnectionContext connectionContext =
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
CDCConnectionStatus status = connectionContext.getStatus();
CDCRequest request = (CDCRequest) msg;
if (CDCConnectionStatus.NOT_LOGGED_IN == status) {
- processLogin(ctx, request);
+ processLogin(ctx, request, connectionContext);
return;
}
switch (request.getRequestCase()) {
case CREATE_SUBSCRIPTION:
- processCreateSubscription(ctx, request);
+ processCreateSubscription(ctx, request, connectionContext);
break;
case START_SUBSCRIPTION:
processStartSubscription(ctx, request, connectionContext);
break;
case STOP_SUBSCRIPTION:
- stopStartSubscription(ctx, request, connectionContext);
+ processStopSubscription(ctx, request, connectionContext);
break;
case DROP_SUBSCRIPTION:
- dropStartSubscription(ctx, request, connectionContext);
+ processDropSubscription(ctx, request, connectionContext);
break;
case ACK_REQUEST:
processAckRequest(ctx, request);
@@ -109,25 +133,34 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
}
}
- private void processLogin(final ChannelHandlerContext ctx, final
CDCRequest request) {
+ private void processLogin(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
if (!request.hasLogin() || !request.getLogin().hasBasicBody()) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss login request
body")).addListener(ChannelFutureListener.CLOSE);
return;
}
BasicBody body = request.getLogin().getBasicBody();
- Collection<ShardingSphereRule> globalRules =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules();
- Optional<AuthorityRule> authorityRule =
globalRules.stream().filter(rule -> rule instanceof AuthorityRule).map(rule ->
(AuthorityRule) rule).findFirst();
+ Optional<AuthorityRule> authorityRule =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class);
if (!authorityRule.isPresent()) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, "Not find authority
rule")).addListener(ChannelFutureListener.CLOSE);
return;
}
Optional<ShardingSphereUser> user = authorityRule.get().findUser(new
Grantee(body.getUsername(), getHostAddress(ctx)));
if (user.isPresent() &&
Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(),
body.getPassword())) {
-
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get().setStatus(CDCConnectionStatus.LOGGED_IN);
+ connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
+ connectionContext.setCurrentUser(user.get());
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
- return;
+ } else {
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD, "Illegal username or
password"))
+ .addListener(ChannelFutureListener.CLOSE);
}
- ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.SERVER_ERROR, "Incorrect username or
password")).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private void checkPrivileges(final Grantee grantee, final String
currentDatabase) {
+ AuthorityRule authorityRule =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
+ .orElseThrow(() -> new
MissingRequiredRuleException("authority"));
+ ShardingSpherePrivileges privileges =
authorityRule.findPrivileges(grantee)
+ .orElseThrow(() -> new SQLCheckException(String.format("Access
denied for user '%s'@'%s'", grantee.getUsername(), grantee.getHostname())));
+
ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
() -> new SQLCheckException(String.format("Unknown database '%s'",
currentDatabase)));
}
private String getHostAddress(final ChannelHandlerContext context) {
@@ -135,7 +168,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
return socketAddress instanceof InetSocketAddress ?
((InetSocketAddress) socketAddress).getAddress().getHostAddress() :
socketAddress.toString();
}
- private void processCreateSubscription(final ChannelHandlerContext ctx,
final CDCRequest request) {
+ private void processCreateSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
if (!request.hasCreateSubscription()) {
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Miss create subscription request
body"))
.addListener(ChannelFutureListener.CLOSE);
@@ -144,10 +177,10 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
CreateSubscriptionRequest createSubscriptionRequest =
request.getCreateSubscription();
if (createSubscriptionRequest.getTableNamesList().isEmpty() ||
createSubscriptionRequest.getDatabase().isEmpty() ||
createSubscriptionRequest.getSubscriptionName().isEmpty()
|| createSubscriptionRequest.getSubscriptionMode() ==
SubscriptionMode.UNKNOWN) {
-
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription
request parameter"))
- .addListener(ChannelFutureListener.CLOSE);
+
ctx.writeAndFlush(CDCResponseGenerator.failed(request.getRequestId(),
CDCResponseErrorCode.ILLEGAL_REQUEST_ERROR, "Illegal create subscription
request parameter"));
return;
}
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
createSubscriptionRequest.getDatabase());
CDCResponse response = backendHandler.createSubscription(request);
ctx.writeAndFlush(response);
}
@@ -164,17 +197,22 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
.addListener(ChannelFutureListener.CLOSE);
return;
}
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
startSubscriptionRequest.getDatabase());
CDCResponse response = backendHandler.startSubscription(request,
ctx.channel(), connectionContext);
ctx.writeAndFlush(response);
}
- private void stopStartSubscription(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
+ private void processStopSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
+ StopSubscriptionRequest stopSubscriptionRequest =
request.getStopSubscription();
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
stopSubscriptionRequest.getDatabase());
backendHandler.stopSubscription(connectionContext.getJobId());
connectionContext.setStatus(CDCConnectionStatus.LOGGED_IN);
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
}
- private void dropStartSubscription(final ChannelHandlerContext ctx, final
CDCRequest request, final CDCConnectionContext connectionContext) {
+ private void processDropSubscription(final ChannelHandlerContext ctx,
final CDCRequest request, final CDCConnectionContext connectionContext) {
+ DropSubscriptionRequest dropSubscriptionRequest =
request.getDropSubscription();
+ checkPrivileges(connectionContext.getCurrentUser().getGrantee(),
dropSubscriptionRequest.getDatabase());
try {
backendHandler.dropSubscription(connectionContext.getJobId());
ctx.writeAndFlush(CDCResponseGenerator.succeedBuilder(request.getRequestId()).build());
diff --git
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 845a270356f..3942246d752 100644
---
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -29,9 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequest
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.junit.AfterClass;
import org.junit.Before;
@@ -40,7 +38,6 @@ import org.junit.Test;
import org.mockito.MockedStatic;
import java.util.Collections;
-import java.util.List;
import java.util.Optional;
import static org.hamcrest.CoreMatchers.is;
@@ -66,19 +63,11 @@ public final class CDCChannelInboundHandlerTest {
proxyContext = mockStatic(ProxyContext.class);
ProxyContext mockedProxyContext = mock(ProxyContext.class,
RETURNS_DEEP_STUBS);
proxyContext.when(ProxyContext::getInstance).thenReturn(mockedProxyContext);
- ShardingSphereRuleMetaData globalRuleMetaData =
mock(ShardingSphereRuleMetaData.class);
-
when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(globalRuleMetaData);
- List<ShardingSphereRule> rules =
Collections.singletonList(mockAuthRule());
- when(globalRuleMetaData.getRules()).thenReturn(rules);
- }
-
- private static AuthorityRule mockAuthRule() {
- AuthorityRule result = mock(AuthorityRule.class);
- ShardingSphereUser mockUser = mock(ShardingSphereUser.class);
- when(mockUser.getGrantee()).thenReturn(new Grantee("root", "%"));
- when(mockUser.getPassword()).thenReturn("root");
- when(result.findUser(any())).thenReturn(Optional.of(mockUser));
- return result;
+ AuthorityRule authorityRule = mock(AuthorityRule.class);
+ ShardingSphereUser rootUser = new ShardingSphereUser("root", "root",
"%");
+ when(authorityRule.findUser(any())).thenReturn(Optional.of(rootUser));
+ ShardingSphereRuleMetaData shardingSphereRuleMetaData = new
ShardingSphereRuleMetaData(Collections.singletonList(authorityRule));
+
when(mockedProxyContext.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(shardingSphereRuleMetaData);
}
@AfterClass
@@ -99,7 +88,7 @@ public final class CDCChannelInboundHandlerTest {
assertTrue(expectedGreetingResult.hasServerGreetingResult());
CDCResponse expectedLoginResult = channel.readOutbound();
assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
- assertThat(expectedLoginResult.getErrorCode(),
is(CDCResponseErrorCode.SERVER_ERROR.getCode()));
+ assertThat(expectedLoginResult.getErrorCode(),
is(CDCResponseErrorCode.ILLEGAL_USERNAME_OR_PASSWORD.getCode()));
assertFalse(channel.isOpen());
}