This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new def9d46a6bc Add PipelineCDCException (#30794)
def9d46a6bc is described below
commit def9d46a6bc58a623daa4cf8f5848938d4ff2422
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Apr 7 00:52:02 2024 +0800
Add PipelineCDCException (#30794)
* Add PipelineCDCException
* Add PipelineCDCException
* Add PipelineCDCException
---
.../user-manual/error-code/sql-error-code.cn.md | 7 ++++---
.../user-manual/error-code/sql-error-code.en.md | 7 ++++---
.../type/kernel/category/PipelineSQLException.java | 8 ++------
...inException.java => CDCLoginFailedException.java} | 11 +++++------
...n.java => EmptyCDCLoginRequestBodyException.java} | 9 ++++-----
...=> MissingRequiredStreamDataSourceException.java} | 9 ++++-----
...erverException.java => PipelineCDCException.java} | 20 ++++++++++++++------
...ion.java => StreamDatabaseNotFoundException.java} | 9 ++++-----
.../data/pipeline/cdc/handler/CDCBackendHandler.java | 9 +++++----
.../frontend/netty/CDCChannelInboundHandler.java | 10 +++++-----
.../frontend/netty/CDCChannelInboundHandlerTest.java | 4 ++--
11 files changed, 53 insertions(+), 50 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index b16d06f2d22..3f3c69a1f5d 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -133,9 +133,10 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| 18092 | 08000 | Get binlog position failed by job \`%s\`, reason
is: %s |
| 18095 | HY000 | Can not find consistency check job of \`%s\`.
|
| 18096 | HY000 | Uncompleted consistency check job \`%s\` exists.
|
-| 18200 | HY000 | Not find stream data source table.
|
-| 18201 | HY000 | CDC server exception, reason is: %s.
|
-| 18202 | HY000 | CDC login failed, reason is: %s
|
+| 18400 | 42S02 | Can not find stream data source table.
|
+| 18401 | 42S02 | Database '%s' does not exist.
|
+| 18410 | 42S02 | CDC Login request body is empty.
|
+| 18411 | 08004 | Illegal username or password.
|
## 功能异常
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 07de23c6f2b..ecec17fcd60 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -134,9 +134,10 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| 18092 | 08000 | Get binlog position failed by job \`%s\`, reason
is: %s |
| 18095 | HY000 | Can not find consistency check job of \`%s\`.
|
| 18096 | HY000 | Uncompleted consistency check job \`%s\` exists.
|
-| 18200 | HY000 | Not find stream data source table.
|
-| 18201 | HY000 | CDC server exception, reason is: %s.
|
-| 18202 | HY000 | CDC login failed, reason is: %s
|
+| 18400 | 42S02 | Can not find stream data source table.
|
+| 18401 | 42S02 | Database '%s' does not exist.
|
+| 18410 | 42S02 | CDC Login request body is empty.
|
+| 18411 | 08004 | Illegal username or password.
|
## Feature Exception
diff --git
a/infra/exception/core/src/main/java/org/apache/shardingsphere/infra/exception/core/external/sql/type/kernel/category/PipelineSQLException.java
b/infra/exception/core/src/main/java/org/apache/shardingsphere/infra/exception/core/external/sql/type/kernel/category/PipelineSQLException.java
index 231b5864a2a..8fba52f69a7 100644
---
a/infra/exception/core/src/main/java/org/apache/shardingsphere/infra/exception/core/external/sql/type/kernel/category/PipelineSQLException.java
+++
b/infra/exception/core/src/main/java/org/apache/shardingsphere/infra/exception/core/external/sql/type/kernel/category/PipelineSQLException.java
@@ -29,11 +29,7 @@ public abstract class PipelineSQLException extends
KernelSQLException {
private static final int KERNEL_CODE = 8;
- protected PipelineSQLException(final SQLState sqlState, final int
errorCode, final String reason) {
- super(sqlState, KERNEL_CODE, errorCode, reason);
- }
-
- protected PipelineSQLException(final SQLState sqlState, final int
errorCode, final String reason, final Exception cause) {
- super(sqlState, KERNEL_CODE, errorCode, reason, cause);
+ protected PipelineSQLException(final SQLState sqlState, final int
errorCode, final String reason, final Object... messageArgs) {
+ super(sqlState, KERNEL_CODE, errorCode, reason, messageArgs);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginFailedException.java
similarity index 69%
copy from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
copy to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginFailedException.java
index 1d5a0cf20d1..f078f78412a 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginFailedException.java
@@ -17,17 +17,16 @@
package org.apache.shardingsphere.data.pipeline.cdc.exception;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
/**
- * CDC login exception.
+ * CDC login failed exception.
*/
-public final class CDCLoginException extends PipelineSQLException {
+public final class CDCLoginFailedException extends PipelineCDCException {
- private static final long serialVersionUID = 6951330476924442374L;
+ private static final long serialVersionUID = -2609355980890040117L;
- public CDCLoginException(final String reason) {
- super(XOpenSQLState.GENERAL_ERROR, 202, String.format("CDC login
failed, reason is: %s", reason));
+ public CDCLoginFailedException() {
+ super(XOpenSQLState.DATA_SOURCE_REJECTED_CONNECTION_ATTEMPT, 11,
"Illegal username or password.");
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/EmptyCDCLoginRequestBodyException.java
similarity index 74%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/EmptyCDCLoginRequestBodyException.java
index 1d5a0cf20d1..2d95b64e907 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCLoginException.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/EmptyCDCLoginRequestBodyException.java
@@ -17,17 +17,16 @@
package org.apache.shardingsphere.data.pipeline.cdc.exception;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
/**
- * CDC login exception.
+ * Empty CDC login request body exception.
*/
-public final class CDCLoginException extends PipelineSQLException {
+public final class EmptyCDCLoginRequestBodyException extends
PipelineCDCException {
private static final long serialVersionUID = 6951330476924442374L;
- public CDCLoginException(final String reason) {
- super(XOpenSQLState.GENERAL_ERROR, 202, String.format("CDC login
failed, reason is: %s", reason));
+ public EmptyCDCLoginRequestBodyException() {
+ super(XOpenSQLState.NOT_FOUND, 10, "CDC Login request body is empty.");
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/MissingRequiredStreamDataSourceException.java
similarity index 72%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/MissingRequiredStreamDataSourceException.java
index 2e7c25ac186..5f38bf84931 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/NotFindStreamDataSourceTableException.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/MissingRequiredStreamDataSourceException.java
@@ -17,17 +17,16 @@
package org.apache.shardingsphere.data.pipeline.cdc.exception;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
/**
- * Not find stream data source table exception.
+ * Missing required stream data source exception.
*/
-public final class NotFindStreamDataSourceTableException extends
PipelineSQLException {
+public final class MissingRequiredStreamDataSourceException extends
PipelineCDCException {
private static final long serialVersionUID = 4003436152767041454L;
- public NotFindStreamDataSourceTableException() {
- super(XOpenSQLState.GENERAL_ERROR, 200, "Not find stream data source
table exception");
+ public MissingRequiredStreamDataSourceException() {
+ super(XOpenSQLState.NOT_FOUND, 0, "Can not find stream data source
table.");
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/PipelineCDCException.java
similarity index 59%
copy from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
copy to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/PipelineCDCException.java
index 9cbd798048b..2083ceffee4 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/PipelineCDCException.java
@@ -17,17 +17,25 @@
package org.apache.shardingsphere.data.pipeline.cdc.exception;
+import com.google.common.base.Preconditions;
+import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.SQLState;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
/**
- * CDC server exception.
+ * Pipeline CDC exception.
*/
-public final class CDCServerException extends PipelineSQLException {
+public abstract class PipelineCDCException extends PipelineSQLException {
- private static final long serialVersionUID = -1064162731346147038L;
+ private static final long serialVersionUID = -7954289125693933632L;
- public CDCServerException(final String reason) {
- super(XOpenSQLState.GENERAL_ERROR, 201, String.format("CDC server
exception, reason is: %s.", reason));
+ private static final int CDC_CODE = 4;
+
+ protected PipelineCDCException(final SQLState sqlState, final int
errorCode, final String reason, final Object... messageArgs) {
+ super(sqlState, getErrorCode(errorCode), reason, messageArgs);
+ }
+
+ private static int getErrorCode(final int errorCode) {
+ Preconditions.checkArgument(errorCode >= 0 && errorCode < 100, "The
value range of error code should be [0, 100).");
+ return CDC_CODE * 100 + errorCode;
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/StreamDatabaseNotFoundException.java
similarity index 74%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/StreamDatabaseNotFoundException.java
index 9cbd798048b..c79c28b09ad 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/CDCServerException.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/exception/StreamDatabaseNotFoundException.java
@@ -17,17 +17,16 @@
package org.apache.shardingsphere.data.pipeline.cdc.exception;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
import
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
/**
- * CDC server exception.
+ * Stream database not found exception.
*/
-public final class CDCServerException extends PipelineSQLException {
+public final class StreamDatabaseNotFoundException extends
PipelineCDCException {
private static final long serialVersionUID = -1064162731346147038L;
- public CDCServerException(final String reason) {
- super(XOpenSQLState.GENERAL_ERROR, 201, String.format("CDC server
exception, reason is: %s.", reason));
+ public StreamDatabaseNotFoundException(final String database) {
+ super(XOpenSQLState.NOT_FOUND, 1, "Database '%s' does not exist.",
database);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 03976216667..a8375be5460 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -33,8 +33,8 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
-import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCServerException;
-import
org.apache.shardingsphere.data.pipeline.cdc.exception.NotFindStreamDataSourceTableException;
+import
org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException;
+import
org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
@@ -100,7 +100,8 @@ public final class CDCBackendHandler {
*/
public CDCResponse streamData(final String requestId, final
StreamDataRequestBody requestBody, final CDCConnectionContext
connectionContext, final Channel channel) {
ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
- ShardingSpherePreconditions.checkNotNull(database, () -> new
CDCExceptionWrapper(requestId, new CDCServerException(String.format("%s
database is not exists", requestBody.getDatabase()))));
+ ShardingSpherePreconditions.checkNotNull(database,
+ () -> new CDCExceptionWrapper(requestId, new
StreamDatabaseNotFoundException(String.format("%s database is not exists",
requestBody.getDatabase()))));
Map<String, Set<String>> schemaTableNameMap;
Collection<String> tableNames;
Set<String> schemaTableNames = new HashSet<>();
@@ -115,7 +116,7 @@ public final class CDCBackendHandler {
.collect(Collectors.toList())));
tableNames = schemaTableNames;
}
- ShardingSpherePreconditions.checkState(!tableNames.isEmpty(), () ->
new CDCExceptionWrapper(requestId, new
NotFindStreamDataSourceTableException()));
+ ShardingSpherePreconditions.checkState(!tableNames.isEmpty(), () ->
new CDCExceptionWrapper(requestId, new
MissingRequiredStreamDataSourceException()));
Map<String, List<DataNode>> actualDataNodesMap =
CDCDataNodeUtils.buildDataNodesMap(database, tableNames);
ShardingSpherePreconditions.checkState(!actualDataNodesMap.isEmpty(),
() -> new PipelineInvalidParameterException(String.format("Not find table %s",
tableNames)));
// TODO Add globalCSNSupported to isolate it with decodeWithTx flag,
they're different. And also update CDCJobPreparer needSorting flag.
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index 708ed307324..198dfb08803 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -28,7 +28,8 @@ import
org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
import org.apache.shardingsphere.authority.rule.AuthorityRule;
import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
-import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginException;
+import
org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginFailedException;
+import
org.apache.shardingsphere.data.pipeline.cdc.exception.EmptyCDCLoginRequestBodyException;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
@@ -136,9 +137,8 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
}
private void processLogin(final ChannelHandlerContext ctx, final
CDCRequest request) {
- if (!request.hasLoginRequestBody() ||
!request.getLoginRequestBody().hasBasicBody()) {
- throw new CDCExceptionWrapper(request.getRequestId(), new
CDCLoginException("Login request body is empty"));
- }
+ ShardingSpherePreconditions.checkState(request.hasLoginRequestBody()
&& request.getLoginRequestBody().hasBasicBody(),
+ () -> new CDCExceptionWrapper(request.getRequestId(), new
EmptyCDCLoginRequestBodyException()));
BasicBody body = request.getLoginRequestBody().getBasicBody();
AuthorityRule authorityRule =
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
Optional<ShardingSphereUser> user = authorityRule.findUser(new
Grantee(body.getUsername(), getHostAddress(ctx)));
@@ -146,7 +146,7 @@ public final class CDCChannelInboundHandler extends
ChannelInboundHandlerAdapter
ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new
CDCConnectionContext(user.get()));
ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
} else {
- throw new CDCExceptionWrapper(request.getRequestId(), new
CDCLoginException("Illegal username or password"));
+ throw new CDCExceptionWrapper(request.getRequestId(), new
CDCLoginFailedException());
}
}
diff --git
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
index 2324bbe5586..ed93103bba7 100644
---
a/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
+++
b/proxy/frontend/core/src/test/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandlerTest.java
@@ -88,7 +88,7 @@ class CDCChannelInboundHandlerTest {
assertTrue(expectedGreetingResult.hasServerGreetingResult());
CDCResponse expectedLoginResult = channel.readOutbound();
assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
- assertThat(expectedLoginResult.getErrorCode(),
is(XOpenSQLState.GENERAL_ERROR.getValue()));
+ assertThat(expectedLoginResult.getErrorCode(),
is(XOpenSQLState.DATA_SOURCE_REJECTED_CONNECTION_ATTEMPT.getValue()));
assertFalse(channel.isOpen());
}
@@ -100,7 +100,7 @@ class CDCChannelInboundHandlerTest {
assertTrue(expectedGreetingResult.hasServerGreetingResult());
CDCResponse expectedLoginResult = channel.readOutbound();
assertThat(expectedLoginResult.getStatus(), is(Status.FAILED));
- assertThat(expectedLoginResult.getErrorCode(),
is(XOpenSQLState.GENERAL_ERROR.getValue()));
+ assertThat(expectedLoginResult.getErrorCode(),
is(XOpenSQLState.NOT_FOUND.getValue()));
assertFalse(channel.isOpen());
}