This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 68a5d5f064d Add show cdc streaming rule DIstSQL and SQL parse IT
(#29053)
68a5d5f064d is described below
commit 68a5d5f064dc8fae9230a5ccaa2edcdb2a5f5d15
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 16 20:20:54 2023 +0800
Add show cdc streaming rule DIstSQL and SQL parse IT (#29053)
* Add show streaming rule
* Add alter/show streaming rule SQL parse IT
---
.../handler/query/ShowStreamingRuleExecutor.java | 62 +++++++++++++++
....distsql.handler.ral.query.QueryableRALExecutor | 1 +
.../distsql/parser/autogen/CDCDistSQLStatement.g4 | 1 +
.../src/main/antlr4/imports/cdc/RALStatement.g4 | 6 +-
.../parser/core/CDCDistSQLStatementVisitor.java | 15 ++++
.../statement/ShowStreamingRuleStatement.java} | 16 ++--
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 5 +-
...terInventoryIncrementalRuleStatementAssert.java | 89 ++++++++++++++++++++++
.../QueryablePipelineRALStatementAssert.java | 5 +-
.../UpdatablePipelineRALStatementAssert.java | 4 +
...rInventoryIncrementalRuleStatementTestCase.java | 42 ++++++++++
.../cases/parser/jaxb/RootSQLParserTestCases.java | 18 +++--
.../ral/ExpectedInventoryIncrementalRule.java | 34 ++++++---
.../segment/impl/distsql/ral/ExpectedRead.java | 47 ++++++++++++
.../segment/impl/distsql/ral/ExpectedWrite.java | 34 ++++++---
.../cdc/ShowStreamingRuleStatementTestCase.java | 16 ++--
test/it/parser/src/main/resources/case/ral/cdc.xml | 28 +++++++
.../src/main/resources/sql/supported/ral/cdc.xml | 2 +
18 files changed, 380 insertions(+), 45 deletions(-)
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
new file mode 100644
index 00000000000..f646e82611b
--- /dev/null
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.handler.query;
+
+import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Show streaming rule executor.
+ */
+public final class ShowStreamingRuleExecutor implements
QueryableRALExecutor<ShowStreamingRuleStatement> {
+
+ @Override
+ public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingRuleStatement sqlStatement) {
+ PipelineProcessConfiguration processConfig =
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class,
"STREAMING"))
+ .showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
+ Collection<LocalDataQueryResultRow> result = new LinkedList<>();
+ result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
+ return result;
+ }
+
+ private String getString(final Object obj) {
+ return null == obj ? "" : JsonUtils.toJsonString(obj);
+ }
+
+ @Override
+ public Collection<String> getColumnNames() {
+ return Arrays.asList("read", "write", "stream_channel");
+ }
+
+ @Override
+ public Class<ShowStreamingRuleStatement> getType() {
+ return ShowStreamingRuleStatement.class;
+ }
+}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
index 684c1dcc4cb..50be75db7b5 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
+++
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
@@ -22,3 +22,4 @@
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationSourceSto
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsExecutor
org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingListExecutor
org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingJobStatusExecutor
+org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingRuleExecutor
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index d45e8e86cf5..aabf400687f 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -23,6 +23,7 @@ execute
: (showStreamingList
| showStreamingStatus
| dropStreaming
+ | showStreamingRule
| alterStreamingRule
) SEMI_? EOF
;
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 39a1a28c3b6..aed0a03e86c 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -19,12 +19,16 @@ grammar RALStatement;
import BaseRule;
+showStreamingRule
+ : SHOW STREAMING RULE
+ ;
+
alterStreamingRule
: ALTER STREAMING RULE inventoryIncrementalRule?
;
inventoryIncrementalRule
- : LP_ readDefinition? (COMMA_? streamChannel)? RP_
+ : LP_ readDefinition? (COMMA_? writeDefinition)? (COMMA_? streamChannel)?
RP_
;
readDefinition
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index c79d9db596e..fdb4ca35b33 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.parser.core;
import org.antlr.v4.runtime.tree.ParseTree;
import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
@@ -33,6 +34,7 @@ import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParse
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
+import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
import
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
@@ -71,6 +73,11 @@ public final class CDCDistSQLStatementVisitor extends
CDCDistSQLStatementBaseVis
return null == ctx ? null : new
IdentifierValue(ctx.getText()).getValue();
}
+ @Override
+ public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
+ return new ShowStreamingRuleStatement();
+ }
+
@Override
public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext
ctx) {
InventoryIncrementalRuleSegment segment = null ==
ctx.inventoryIncrementalRule() ? null
@@ -84,6 +91,9 @@ public final class CDCDistSQLStatementVisitor extends
CDCDistSQLStatementBaseVis
if (null != ctx.readDefinition()) {
result.setReadSegment((ReadOrWriteSegment)
visit(ctx.readDefinition()));
}
+ if (null != ctx.writeDefinition()) {
+ result.setWriteSegment((ReadOrWriteSegment)
visit(ctx.writeDefinition()));
+ }
if (null != ctx.streamChannel()) {
result.setStreamChannel((AlgorithmSegment)
visit(ctx.streamChannel()));
}
@@ -116,6 +126,11 @@ public final class CDCDistSQLStatementVisitor extends
CDCDistSQLStatementBaseVis
return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()),
getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
}
+ @Override
+ public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
+ return visit(ctx.algorithmDefinition());
+ }
+
@Override
public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
return visit(ctx.algorithmDefinition());
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingRuleStatement.java
similarity index 74%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to
kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingRuleStatement.java
index d45e8e86cf5..f05f8c08818 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingRuleStatement.java
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-grammar CDCDistSQLStatement;
+package org.apache.shardingsphere.cdc.distsql.statement;
-import Symbol, RALStatement;
+import
org.apache.shardingsphere.distsql.statement.ral.pipeline.cdc.QueryableCDCRALStatement;
-execute
- : (showStreamingList
- | showStreamingStatus
- | dropStreaming
- | alterStreamingRule
- ) SEMI_? EOF
- ;
+/**
+ * Show streaming rule statement.
+ */
+public final class ShowStreamingRuleStatement extends QueryableCDCRALStatement
{
+}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index e5b0a12d08d..4bc5536b2bc 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -98,8 +98,9 @@ class CDCE2EIT {
void assertCDCDataImportSuccess(final PipelineTestParameter testParam)
throws SQLException, InterruptedException {
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new CDCJobType())) {
- containerComposer.proxyExecuteWithLog("ALTER STREAMING RULE
(READ(WORKER_THREAD=64,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER
(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
- + "STREAM_CHANNEL
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))));", 0);
+ String alterStreamingRule = "ALTER STREAMING RULE
(READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
+ + "WRITE(WORKER_THREAD=20,BATCH_SIZE=1000,
RATE_LIMITER(TYPE(NAME='TPS',PROPERTIES('tps'='10000')))),STREAM_CHANNEL(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))))";
+ containerComposer.proxyExecuteWithLog(alterStreamingRule, 0);
for (String each : Arrays.asList(PipelineContainerComposer.DS_0,
PipelineContainerComposer.DS_1)) {
containerComposer.registerStorageUnit(each);
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java
new file mode 100644
index 00000000000..7b05ef298a7
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
+import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
+import
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedRead;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedWrite;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Alter inventory incremental rule statement assert.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class AlterInventoryIncrementalRuleStatementAssert {
+
+ /**
+ * Assert statement is correct with expected parser result.
+ *
+ * @param assertContext assert context
+ * @param actual actual statement
+ * @param expected expected statement test case
+ */
+ public static void assertIs(final SQLCaseAssertContext assertContext,
final AlterInventoryIncrementalRuleStatement actual, final
AlterInventoryIncrementalRuleStatementTestCase expected) {
+ if (null == expected) {
+ assertNull(actual, assertContext.getText("Actual statement should
not exist."));
+ } else {
+ assertThat(actual.getJobTypeName(), is(expected.getJobTypeName()));
+ assertRead(assertContext,
actual.getProcessConfigSegment().getReadSegment(),
expected.getRule().getRead());
+ assertWrite(assertContext,
actual.getProcessConfigSegment().getWriteSegment(),
expected.getRule().getWrite());
+ assertTypeStrategy(assertContext,
actual.getProcessConfigSegment().getStreamChannel(),
expected.getRule().getStreamChannel());
+ }
+ }
+
+ private static void assertRead(final SQLCaseAssertContext assertContext,
final ReadOrWriteSegment actual, final ExpectedRead expected) {
+ if (null == expected) {
+ assertNull(actual, assertContext.getText("Actual read or write
should not exist."));
+ return;
+ }
+ assertThat(actual.getWorkerThread(), is(expected.getWorkerThread()));
+ assertThat(actual.getBatchSize(), is(expected.getBatchSize()));
+ assertThat(actual.getShardingSize(), is(expected.getShardingSize()));
+ assertThat(actual.getRateLimiter().getName(),
is(expected.getRateLimiter().getName()));
+ }
+
+ private static void assertWrite(final SQLCaseAssertContext assertContext,
final ReadOrWriteSegment actual, final ExpectedWrite expected) {
+ if (null == expected) {
+ assertNull(actual, assertContext.getText("Actual read or write
should not exist."));
+ return;
+ }
+ assertThat(actual.getWorkerThread(), is(expected.getWorkerThread()));
+ assertThat(actual.getBatchSize(), is(expected.getBatchSize()));
+ assertThat(actual.getRateLimiter().getName(),
is(expected.getRateLimiter().getName()));
+ }
+
+ private static void assertTypeStrategy(final SQLCaseAssertContext
assertContext, final AlgorithmSegment actual, final ExpectedAlgorithm expected)
{
+ if (null == expected) {
+ assertNull(actual, assertContext.getText("Actual strategy should
not exist."));
+ } else {
+ assertNotNull(actual, assertContext.getText("Actual strategy
should exist."));
+ assertThat(assertContext.getText("Type assertion error"),
actual.getName(), is(expected.getName()));
+ }
+ }
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
index 987af395a11..ba61258c4d9 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import
org.apache.shardingsphere.distsql.statement.ral.pipeline.QueryablePipelineRALStatement;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
@@ -28,10 +29,10 @@ import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListSt
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.ShowStreamingStatusStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.query.ShowMigrationCheckStatusStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.query.ShowMigrationStatusStatementAssert;
-import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckStatusStatementTestCase;
@@ -66,6 +67,8 @@ public final class QueryablePipelineRALStatementAssert {
ExistingAssert.assertIs(assertContext, actual, expected);
} else if (actual instanceof ShowStreamingStatusStatement) {
ShowStreamingStatusStatementAssert.assertIs(assertContext,
(ShowStreamingStatusStatement) actual, (ShowStreamingStatusStatementTestCase)
expected);
+ } else if (actual instanceof ShowStreamingRuleStatement) {
+ ExistingAssert.assertIs(assertContext, actual, expected);
}
}
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
index e6cf8576651..d6dfa98ceb6 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
import
org.apache.shardingsphere.distsql.statement.ral.pipeline.UpdatablePipelineRALStatement;
+import
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
import
org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
import
org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
@@ -43,6 +44,7 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationCheckStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
import
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
@@ -93,6 +95,8 @@ public final class UpdatablePipelineRALStatementAssert {
StartMigrationCheckStatementAssert.assertIs(assertContext,
(StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase)
expected);
} else if (actual instanceof StopMigrationCheckStatement) {
StopMigrationCheckStatementAssert.assertIs(assertContext,
(StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase)
expected);
+ } else if (actual instanceof AlterInventoryIncrementalRuleStatement) {
+
AlterInventoryIncrementalRuleStatementAssert.assertIs(assertContext,
(AlterInventoryIncrementalRuleStatement) actual,
(AlterInventoryIncrementalRuleStatementTestCase) expected);
} else if (actual instanceof DropStreamingStatement) {
DropStreamingStatementAssert.assertIs(assertContext,
(DropStreamingStatement) actual, (DropStreamingStatementTestCase) expected);
}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java
new file mode 100644
index 00000000000..2506f0f11e9
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral;
+
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedInventoryIncrementalRule;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Alter inventory incremental rule statement test case.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class AlterInventoryIncrementalRuleStatementTestCase extends
SQLParserTestCase {
+
+ @XmlElement(name = "job-type-name")
+ private String jobTypeName;
+
+ @XmlElement(name = "rule")
+ private ExpectedInventoryIncrementalRule rule;
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 2374134bcde..cecb97b0cdf 100644
---
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.SneakyThrows;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.CommonStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.AlterResourceGroupStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.BinlogStatementTestCase;
@@ -132,10 +133,10 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPluggableDatabaseStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPolicyStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterProcedureStatementTestCase;
-import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPublicationStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterProfileStatementTestCase;
-import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRoutineStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPublicationStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRollbackSegmentStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRoutineStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterSchemaStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterSequenceStatementTestCase;
@@ -271,8 +272,8 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.DropTriggerStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.DropTypeStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.DropViewStatementTestCase;
-import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FetchStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.ExecuteStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FetchStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FlashbackDatabaseStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FlashbackTableStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.ListenStatementTestCase;
@@ -295,8 +296,8 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.DeleteStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.DoStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.HandlerStatementTestCase;
-import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.InsertStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.ImportStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.InsertStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.LoadDataStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.LoadXMLStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.LockTableStatementTestCase;
@@ -339,6 +340,7 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
@@ -392,6 +394,7 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rdl.rule.sharding.DropShardingTableReferenceRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rdl.rule.sharding.DropShardingTableRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rdl.rule.single.SetDefaultSingleTableStorageUnitStatementTestCase;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rl.ChangeReplicationSourceToStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rql.rule.encrypt.CountEncryptRuleStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rql.rule.encrypt.ShowEncryptRulesStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rql.rule.mask.CountMaskRuleStatementTestCase;
@@ -438,7 +441,6 @@ import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.SetTransactionStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.UnlockStatementTestCase;
import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.XATestCase;
-import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rl.ChangeReplicationSourceToStatementTestCase;
import org.mockito.internal.configuration.plugins.Plugins;
import javax.xml.bind.annotation.XmlElement;
@@ -1047,6 +1049,12 @@ public final class RootSQLParserTestCases {
@XmlElement(name = "migrate-table")
private final List<MigrateTableStatementTestCase> migrateTableTestCases =
new LinkedList<>();
+ @XmlElement(name = "show-streaming-rule")
+ private final List<ShowStreamingRuleStatementTestCase>
showStreamingRuleTestCases = new LinkedList<>();
+
+ @XmlElement(name = "alter-streaming-rule")
+ private final List<AlterInventoryIncrementalRuleStatementTestCase>
alterStreamingRuleTestCases = new LinkedList<>();
+
@XmlElement(name = "show-streaming-list")
private final List<ShowStreamingListStatementTestCase>
showStreamingListTestCases = new LinkedList<>();
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java
similarity index 50%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java
index d45e8e86cf5..7c33deccd80 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java
@@ -15,14 +15,30 @@
* limitations under the License.
*/
-grammar CDCDistSQLStatement;
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral;
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
-execute
- : (showStreamingList
- | showStreamingStatus
- | dropStreaming
- | alterStreamingRule
- ) SEMI_? EOF
- ;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected inventory incremental rule.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class ExpectedInventoryIncrementalRule {
+
+ @XmlElement(name = "read")
+ private ExpectedRead read;
+
+ @XmlElement(name = "write")
+ private ExpectedWrite write;
+
+ @XmlElement(name = "stream-channel")
+ private ExpectedAlgorithm streamChannel;
+}
diff --git
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedRead.java
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedRead.java
new file mode 100644
index 00000000000..c9966b66fb9
--- /dev/null
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedRead.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral;
+
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected read.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class ExpectedRead {
+
+ @XmlElement(name = "worker-thread")
+ private Integer workerThread;
+
+ @XmlElement(name = "batch-size")
+ private Integer batchSize;
+
+ @XmlElement(name = "sharding-size")
+ private Integer shardingSize;
+
+ @XmlElement(name = "rate-limiter")
+ private ExpectedAlgorithm rateLimiter;
+}
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedWrite.java
similarity index 51%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedWrite.java
index d45e8e86cf5..5171ec91e67 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedWrite.java
@@ -15,14 +15,30 @@
* limitations under the License.
*/
-grammar CDCDistSQLStatement;
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral;
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.Setter;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
-execute
- : (showStreamingList
- | showStreamingStatus
- | dropStreaming
- | alterStreamingRule
- ) SEMI_? EOF
- ;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected write.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class ExpectedWrite {
+
+ @XmlElement(name = "worker-thread")
+ private Integer workerThread;
+
+ @XmlElement(name = "batch-size")
+ private Integer batchSize;
+
+ @XmlElement(name = "rate-limiter")
+ private ExpectedAlgorithm rateLimiter;
+}
diff --git
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingRuleStatementTestCase.java
similarity index 70%
copy from
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingRuleStatementTestCase.java
index d45e8e86cf5..d8becccedb7 100644
---
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingRuleStatementTestCase.java
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-grammar CDCDistSQLStatement;
+package
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
-import Symbol, RALStatement;
+import
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
-execute
- : (showStreamingList
- | showStreamingStatus
- | dropStreaming
- | alterStreamingRule
- ) SEMI_? EOF
- ;
+/**
+ * Show streaming rule statement test case.
+ */
+public final class ShowStreamingRuleStatementTestCase extends
SQLParserTestCase {
+}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml
b/test/it/parser/src/main/resources/case/ral/cdc.xml
index 534077055b8..35df1528745 100644
--- a/test/it/parser/src/main/resources/case/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -17,6 +17,34 @@
-->
<sql-parser-test-cases>
+ <show-streaming-rule sql-case-id="show-streaming-rule" />
+
+ <alter-streaming-rule sql-case-id="alter-streaming-rule">
+ <job-type-name>STREAMING</job-type-name>
+ <rule>
+ <read>
+ <worker-thread>20</worker-thread>
+ <batch-size>1000</batch-size>
+ <sharding-size>10000000</sharding-size>
+ <rate-limiter algorithm-name="QPS">
+ <properties>
+ <property key="qps" value="500" />
+ </properties>
+ </rate-limiter>
+ </read>
+ <write>
+ <worker-thread>20</worker-thread>
+ <batch-size>2000</batch-size>
+ <rate-limiter algorithm-name="TPS">
+ <property key="tps" value="2000" />
+ </rate-limiter>
+ </write>
+ <stream-channel algorithm-name="MEMORY">
+ <property key="block-queue-size" value="100" />
+ </stream-channel>
+ </rule>
+ </alter-streaming-rule>
+
<show-streaming-list sql-case-id="show-streaming-list" />
<show-streaming-status sql-case-id="show-streaming-status">
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
index 282f03b2629..91a7680418e 100644
--- a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -17,6 +17,8 @@
-->
<sql-cases>
+ <sql-case id="show-streaming-rule" value="SHOW STREAMING RULE;"
db-types="ShardingSphere"/>
+ <sql-case id="alter-streaming-rule" value="ALTER STREAMING RULE
(READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER
(TYPE(NAME='QPS',PROPERTIES('qps'='500')))),WRITE(WORKER_THREAD=20,BATCH_SIZE=2000,RATE_LIMITER
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),STREAM_CHANNEL
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='100'))));"
db-types="ShardingSphere" />
<sql-case id="show-streaming-list" value="SHOW STREAMING LIST;"
db-types="ShardingSphere"/>
<sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;"
db-types="ShardingSphere"/>
<sql-case id="drop-streaming" value="DROP STREAMING 123;"
db-types="ShardingSphere"/>