This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 68a5d5f064d Add show cdc streaming rule DIstSQL and SQL parse IT 
(#29053)
68a5d5f064d is described below

commit 68a5d5f064dc8fae9230a5ccaa2edcdb2a5f5d15
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 16 20:20:54 2023 +0800

    Add show cdc streaming rule DIstSQL and SQL parse IT (#29053)
    
    * Add show streaming rule
    
    * Add alter/show streaming rule SQL parse IT
---
 .../handler/query/ShowStreamingRuleExecutor.java   | 62 +++++++++++++++
 ....distsql.handler.ral.query.QueryableRALExecutor |  1 +
 .../distsql/parser/autogen/CDCDistSQLStatement.g4  |  1 +
 .../src/main/antlr4/imports/cdc/RALStatement.g4    |  6 +-
 .../parser/core/CDCDistSQLStatementVisitor.java    | 15 ++++
 .../statement/ShowStreamingRuleStatement.java}     | 16 ++--
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  5 +-
 ...terInventoryIncrementalRuleStatementAssert.java | 89 ++++++++++++++++++++++
 .../QueryablePipelineRALStatementAssert.java       |  5 +-
 .../UpdatablePipelineRALStatementAssert.java       |  4 +
 ...rInventoryIncrementalRuleStatementTestCase.java | 42 ++++++++++
 .../cases/parser/jaxb/RootSQLParserTestCases.java  | 18 +++--
 .../ral/ExpectedInventoryIncrementalRule.java      | 34 ++++++---
 .../segment/impl/distsql/ral/ExpectedRead.java     | 47 ++++++++++++
 .../segment/impl/distsql/ral/ExpectedWrite.java    | 34 ++++++---
 .../cdc/ShowStreamingRuleStatementTestCase.java    | 16 ++--
 test/it/parser/src/main/resources/case/ral/cdc.xml | 28 +++++++
 .../src/main/resources/sql/supported/ral/cdc.xml   |  2 +
 18 files changed, 380 insertions(+), 45 deletions(-)

diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
new file mode 100644
index 00000000000..f646e82611b
--- /dev/null
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.cdc.distsql.handler.query;
+
+import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
+import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.json.JsonUtils;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Show streaming rule executor.
+ */
+public final class ShowStreamingRuleExecutor implements 
QueryableRALExecutor<ShowStreamingRuleStatement> {
+    
+    @Override
+    public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingRuleStatement sqlStatement) {
+        PipelineProcessConfiguration processConfig = 
((InventoryIncrementalJobAPI) TypedSPILoader.getService(PipelineJobAPI.class, 
"STREAMING"))
+                .showProcessConfiguration(new 
PipelineContextKey(InstanceType.PROXY));
+        Collection<LocalDataQueryResultRow> result = new LinkedList<>();
+        result.add(new 
LocalDataQueryResultRow(getString(processConfig.getRead()), 
getString(processConfig.getWrite()), 
getString(processConfig.getStreamChannel())));
+        return result;
+    }
+    
+    private String getString(final Object obj) {
+        return null == obj ? "" : JsonUtils.toJsonString(obj);
+    }
+    
+    @Override
+    public Collection<String> getColumnNames() {
+        return Arrays.asList("read", "write", "stream_channel");
+    }
+    
+    @Override
+    public Class<ShowStreamingRuleStatement> getType() {
+        return ShowStreamingRuleStatement.class;
+    }
+}
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
 
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
index 684c1dcc4cb..50be75db7b5 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/resources/META-INF/services/org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor
@@ -22,3 +22,4 @@ 
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationSourceSto
 
org.apache.shardingsphere.migration.distsql.handler.query.ShowMigrationCheckAlgorithmsExecutor
 org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingListExecutor
 
org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingJobStatusExecutor
+org.apache.shardingsphere.cdc.distsql.handler.query.ShowStreamingRuleExecutor
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
index d45e8e86cf5..aabf400687f 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
@@ -23,6 +23,7 @@ execute
     : (showStreamingList
     | showStreamingStatus
     | dropStreaming
+    | showStreamingRule
     | alterStreamingRule
     ) SEMI_? EOF
     ;
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
index 39a1a28c3b6..aed0a03e86c 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/antlr4/imports/cdc/RALStatement.g4
@@ -19,12 +19,16 @@ grammar RALStatement;
 
 import BaseRule;
 
+showStreamingRule
+    : SHOW STREAMING RULE
+    ;
+
 alterStreamingRule
     : ALTER STREAMING RULE inventoryIncrementalRule?
     ;
 
 inventoryIncrementalRule
-    : LP_ readDefinition? (COMMA_? streamChannel)? RP_
+    : LP_ readDefinition? (COMMA_? writeDefinition)? (COMMA_? streamChannel)? 
RP_
     ;
 
 readDefinition
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
index c79d9db596e..fdb4ca35b33 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
+++ 
b/kernel/data-pipeline/distsql/parser/src/main/java/org/apache/shardingsphere/cdc/distsql/parser/core/CDCDistSQLStatementVisitor.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.parser.core;
 import org.antlr.v4.runtime.tree.ParseTree;
 import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
@@ -33,6 +34,7 @@ import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParse
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
+import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
@@ -71,6 +73,11 @@ public final class CDCDistSQLStatementVisitor extends 
CDCDistSQLStatementBaseVis
         return null == ctx ? null : new 
IdentifierValue(ctx.getText()).getValue();
     }
     
+    @Override
+    public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
+        return new ShowStreamingRuleStatement();
+    }
+    
     @Override
     public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext 
ctx) {
         InventoryIncrementalRuleSegment segment = null == 
ctx.inventoryIncrementalRule() ? null
@@ -84,6 +91,9 @@ public final class CDCDistSQLStatementVisitor extends 
CDCDistSQLStatementBaseVis
         if (null != ctx.readDefinition()) {
             result.setReadSegment((ReadOrWriteSegment) 
visit(ctx.readDefinition()));
         }
+        if (null != ctx.writeDefinition()) {
+            result.setWriteSegment((ReadOrWriteSegment) 
visit(ctx.writeDefinition()));
+        }
         if (null != ctx.streamChannel()) {
             result.setStreamChannel((AlgorithmSegment) 
visit(ctx.streamChannel()));
         }
@@ -116,6 +126,11 @@ public final class CDCDistSQLStatementVisitor extends 
CDCDistSQLStatementBaseVis
         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), 
getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
     }
     
+    @Override
+    public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
+        return visit(ctx.algorithmDefinition());
+    }
+    
     @Override
     public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
         return visit(ctx.algorithmDefinition());
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingRuleStatement.java
similarity index 74%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to 
kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingRuleStatement.java
index d45e8e86cf5..f05f8c08818 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/kernel/data-pipeline/distsql/statement/src/main/java/org/apache/shardingsphere/cdc/distsql/statement/ShowStreamingRuleStatement.java
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-grammar CDCDistSQLStatement;
+package org.apache.shardingsphere.cdc.distsql.statement;
 
-import Symbol, RALStatement;
+import 
org.apache.shardingsphere.distsql.statement.ral.pipeline.cdc.QueryableCDCRALStatement;
 
-execute
-    : (showStreamingList
-    | showStreamingStatus
-    | dropStreaming
-    | alterStreamingRule
-    ) SEMI_? EOF
-    ;
+/**
+ * Show streaming rule statement.
+ */
+public final class ShowStreamingRuleStatement extends QueryableCDCRALStatement 
{
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index e5b0a12d08d..4bc5536b2bc 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -98,8 +98,9 @@ class CDCE2EIT {
     void assertCDCDataImportSuccess(final PipelineTestParameter testParam) 
throws SQLException, InterruptedException {
         TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new CDCJobType())) {
-            containerComposer.proxyExecuteWithLog("ALTER STREAMING RULE 
(READ(WORKER_THREAD=64,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER 
(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
-                    + "STREAM_CHANNEL 
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))));", 0);
+            String alterStreamingRule = "ALTER STREAMING RULE 
(READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
+                    + "WRITE(WORKER_THREAD=20,BATCH_SIZE=1000, 
RATE_LIMITER(TYPE(NAME='TPS',PROPERTIES('tps'='10000')))),STREAM_CHANNEL(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))))";
+            containerComposer.proxyExecuteWithLog(alterStreamingRule, 0);
             for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
                 containerComposer.registerStorageUnit(each);
             }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java
new file mode 100644
index 00000000000..7b05ef298a7
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/AlterInventoryIncrementalRuleStatementAssert.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
+import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
+import 
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedRead;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedWrite;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Alter inventory incremental rule statement assert.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class AlterInventoryIncrementalRuleStatementAssert {
+    
+    /**
+     * Assert statement is correct with expected parser result.
+     *
+     * @param assertContext assert context
+     * @param actual actual statement
+     * @param expected expected statement test case
+     */
+    public static void assertIs(final SQLCaseAssertContext assertContext, 
final AlterInventoryIncrementalRuleStatement actual, final 
AlterInventoryIncrementalRuleStatementTestCase expected) {
+        if (null == expected) {
+            assertNull(actual, assertContext.getText("Actual statement should 
not exist."));
+        } else {
+            assertThat(actual.getJobTypeName(), is(expected.getJobTypeName()));
+            assertRead(assertContext, 
actual.getProcessConfigSegment().getReadSegment(), 
expected.getRule().getRead());
+            assertWrite(assertContext, 
actual.getProcessConfigSegment().getWriteSegment(), 
expected.getRule().getWrite());
+            assertTypeStrategy(assertContext, 
actual.getProcessConfigSegment().getStreamChannel(), 
expected.getRule().getStreamChannel());
+        }
+    }
+    
+    private static void assertRead(final SQLCaseAssertContext assertContext, 
final ReadOrWriteSegment actual, final ExpectedRead expected) {
+        if (null == expected) {
+            assertNull(actual, assertContext.getText("Actual read or write 
should not exist."));
+            return;
+        }
+        assertThat(actual.getWorkerThread(), is(expected.getWorkerThread()));
+        assertThat(actual.getBatchSize(), is(expected.getBatchSize()));
+        assertThat(actual.getShardingSize(), is(expected.getShardingSize()));
+        assertThat(actual.getRateLimiter().getName(), 
is(expected.getRateLimiter().getName()));
+    }
+    
+    private static void assertWrite(final SQLCaseAssertContext assertContext, 
final ReadOrWriteSegment actual, final ExpectedWrite expected) {
+        if (null == expected) {
+            assertNull(actual, assertContext.getText("Actual read or write 
should not exist."));
+            return;
+        }
+        assertThat(actual.getWorkerThread(), is(expected.getWorkerThread()));
+        assertThat(actual.getBatchSize(), is(expected.getBatchSize()));
+        assertThat(actual.getRateLimiter().getName(), 
is(expected.getRateLimiter().getName()));
+    }
+    
+    private static void assertTypeStrategy(final SQLCaseAssertContext 
assertContext, final AlgorithmSegment actual, final ExpectedAlgorithm expected) 
{
+        if (null == expected) {
+            assertNull(actual, assertContext.getText("Actual strategy should 
not exist."));
+        } else {
+            assertNotNull(actual, assertContext.getText("Actual strategy 
should exist."));
+            assertThat(assertContext.getText("Type assertion error"), 
actual.getName(), is(expected.getName()));
+        }
+    }
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
index 987af395a11..ba61258c4d9 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/QueryablePipelineRALStatementAssert.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatement;
+import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
 import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
 import 
org.apache.shardingsphere.distsql.statement.ral.pipeline.QueryablePipelineRALStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
@@ -28,10 +29,10 @@ import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListSt
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationSourceStorageUnitsStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.SQLCaseAssertContext;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.cdc.ShowStreamingStatusStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.query.ShowMigrationCheckStatusStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.query.ShowMigrationStatusStatementAssert;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ExistingAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.ShowMigrationCheckStatusStatementTestCase;
@@ -66,6 +67,8 @@ public final class QueryablePipelineRALStatementAssert {
             ExistingAssert.assertIs(assertContext, actual, expected);
         } else if (actual instanceof ShowStreamingStatusStatement) {
             ShowStreamingStatusStatementAssert.assertIs(assertContext, 
(ShowStreamingStatusStatement) actual, (ShowStreamingStatusStatementTestCase) 
expected);
+        } else if (actual instanceof ShowStreamingRuleStatement) {
+            ExistingAssert.assertIs(assertContext, actual, expected);
         }
     }
 }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
index e6cf8576651..d6dfa98ceb6 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/asserts/statement/ral/impl/pipeline/UpdatablePipelineRALStatementAssert.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.cdc.distsql.statement.DropStreamingStatement;
 import 
org.apache.shardingsphere.distsql.statement.ral.pipeline.UpdatablePipelineRALStatement;
+import 
org.apache.shardingsphere.distsql.statement.ral.updatable.AlterInventoryIncrementalRuleStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.CommitMigrationStatement;
 import 
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
@@ -43,6 +44,7 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.r
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationCheckStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.StopMigrationStatementAssert;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.asserts.statement.ral.impl.pipeline.migration.update.UnregisterMigrationSourceStorageUnitStatementAssert;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
@@ -93,6 +95,8 @@ public final class UpdatablePipelineRALStatementAssert {
             StartMigrationCheckStatementAssert.assertIs(assertContext, 
(StartMigrationCheckStatement) actual, (StartMigrationCheckStatementTestCase) 
expected);
         } else if (actual instanceof StopMigrationCheckStatement) {
             StopMigrationCheckStatementAssert.assertIs(assertContext, 
(StopMigrationCheckStatement) actual, (StopMigrationCheckStatementTestCase) 
expected);
+        } else if (actual instanceof AlterInventoryIncrementalRuleStatement) {
+            
AlterInventoryIncrementalRuleStatementAssert.assertIs(assertContext, 
(AlterInventoryIncrementalRuleStatement) actual, 
(AlterInventoryIncrementalRuleStatementTestCase) expected);
         } else if (actual instanceof DropStreamingStatement) {
             DropStreamingStatementAssert.assertIs(assertContext, 
(DropStreamingStatement) actual, (DropStreamingStatementTestCase) expected);
         }
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java
new file mode 100644
index 00000000000..2506f0f11e9
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/domain/statement/ral/AlterInventoryIncrementalRuleStatementTestCase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral.ExpectedInventoryIncrementalRule;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Alter inventory incremental rule statement test case.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class AlterInventoryIncrementalRuleStatementTestCase extends 
SQLParserTestCase {
+    
+    @XmlElement(name = "job-type-name")
+    private String jobTypeName;
+    
+    @XmlElement(name = "rule")
+    private ExpectedInventoryIncrementalRule rule;
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
index 2374134bcde..cecb97b0cdf 100644
--- 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/RootSQLParserTestCases.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb;
 import com.google.common.base.Preconditions;
 import lombok.Getter;
 import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.domain.statement.ral.AlterInventoryIncrementalRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.CommonStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.AlterResourceGroupStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dal.BinlogStatementTestCase;
@@ -132,10 +133,10 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPluggableDatabaseStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPolicyStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterProcedureStatementTestCase;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPublicationStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterProfileStatementTestCase;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRoutineStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterPublicationStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRollbackSegmentStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRoutineStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterSchemaStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.AlterSequenceStatementTestCase;
@@ -271,8 +272,8 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.DropTriggerStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.DropTypeStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.DropViewStatementTestCase;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FetchStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.ExecuteStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FetchStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FlashbackDatabaseStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.FlashbackTableStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ddl.ListenStatementTestCase;
@@ -295,8 +296,8 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.DeleteStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.DoStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.HandlerStatementTestCase;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.InsertStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.ImportStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.InsertStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.LoadDataStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.LoadXMLStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.dml.LockTableStatementTestCase;
@@ -339,6 +340,7 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.UnlabelComputeNodeStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.DropStreamingStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingListStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc.ShowStreamingStatusStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CheckMigrationStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.migration.CommitMigrationStatementTestCase;
@@ -392,6 +394,7 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rdl.rule.sharding.DropShardingTableReferenceRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rdl.rule.sharding.DropShardingTableRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rdl.rule.single.SetDefaultSingleTableStorageUnitStatementTestCase;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rl.ChangeReplicationSourceToStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rql.rule.encrypt.CountEncryptRuleStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rql.rule.encrypt.ShowEncryptRulesStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rql.rule.mask.CountMaskRuleStatementTestCase;
@@ -438,7 +441,6 @@ import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.s
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.SetTransactionStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.UnlockStatementTestCase;
 import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.tcl.XATestCase;
-import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.rl.ChangeReplicationSourceToStatementTestCase;
 import org.mockito.internal.configuration.plugins.Plugins;
 
 import javax.xml.bind.annotation.XmlElement;
@@ -1047,6 +1049,12 @@ public final class RootSQLParserTestCases {
     @XmlElement(name = "migrate-table")
     private final List<MigrateTableStatementTestCase> migrateTableTestCases = 
new LinkedList<>();
     
+    @XmlElement(name = "show-streaming-rule")
+    private final List<ShowStreamingRuleStatementTestCase> 
showStreamingRuleTestCases = new LinkedList<>();
+    
+    @XmlElement(name = "alter-streaming-rule")
+    private final List<AlterInventoryIncrementalRuleStatementTestCase> 
alterStreamingRuleTestCases = new LinkedList<>();
+    
     @XmlElement(name = "show-streaming-list")
     private final List<ShowStreamingListStatementTestCase> 
showStreamingListTestCases = new LinkedList<>();
     
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java
similarity index 50%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java
index d45e8e86cf5..7c33deccd80 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedInventoryIncrementalRule.java
@@ -15,14 +15,30 @@
  * limitations under the License.
  */
 
-grammar CDCDistSQLStatement;
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral;
 
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
 
-execute
-    : (showStreamingList
-    | showStreamingStatus
-    | dropStreaming
-    | alterStreamingRule
-    ) SEMI_? EOF
-    ;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected inventory incremental rule.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class ExpectedInventoryIncrementalRule {
+    
+    @XmlElement(name = "read")
+    private ExpectedRead read;
+    
+    @XmlElement(name = "write")
+    private ExpectedWrite write;
+    
+    @XmlElement(name = "stream-channel")
+    private ExpectedAlgorithm streamChannel;
+}
diff --git 
a/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedRead.java
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedRead.java
new file mode 100644
index 00000000000..c9966b66fb9
--- /dev/null
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedRead.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral;
+
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected read.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class ExpectedRead {
+    
+    @XmlElement(name = "worker-thread")
+    private Integer workerThread;
+    
+    @XmlElement(name = "batch-size")
+    private Integer batchSize;
+    
+    @XmlElement(name = "sharding-size")
+    private Integer shardingSize;
+    
+    @XmlElement(name = "rate-limiter")
+    private ExpectedAlgorithm rateLimiter;
+}
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedWrite.java
similarity index 51%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedWrite.java
index d45e8e86cf5..5171ec91e67 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/segment/impl/distsql/ral/ExpectedWrite.java
@@ -15,14 +15,30 @@
  * limitations under the License.
  */
 
-grammar CDCDistSQLStatement;
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ral;
 
-import Symbol, RALStatement;
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.segment.impl.distsql.ExpectedAlgorithm;
 
-execute
-    : (showStreamingList
-    | showStreamingStatus
-    | dropStreaming
-    | alterStreamingRule
-    ) SEMI_? EOF
-    ;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * Expected write.
+ */
+@Getter
+@Setter
+@XmlAccessorType(XmlAccessType.FIELD)
+public final class ExpectedWrite {
+    
+    @XmlElement(name = "worker-thread")
+    private Integer workerThread;
+    
+    @XmlElement(name = "batch-size")
+    private Integer batchSize;
+    
+    @XmlElement(name = "rate-limiter")
+    private ExpectedAlgorithm rateLimiter;
+}
diff --git 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingRuleStatementTestCase.java
similarity index 70%
copy from 
kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
copy to 
test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingRuleStatementTestCase.java
index d45e8e86cf5..d8becccedb7 100644
--- 
a/kernel/data-pipeline/distsql/parser/src/main/antlr4/cdc/org/apache/shardingsphere/distsql/parser/autogen/CDCDistSQLStatement.g4
+++ 
b/test/it/parser/src/main/java/org/apache/shardingsphere/test/it/sql/parser/internal/cases/parser/jaxb/statement/ral/cdc/ShowStreamingRuleStatementTestCase.java
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-grammar CDCDistSQLStatement;
+package 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.statement.ral.cdc;
 
-import Symbol, RALStatement;
+import 
org.apache.shardingsphere.test.it.sql.parser.internal.cases.parser.jaxb.SQLParserTestCase;
 
-execute
-    : (showStreamingList
-    | showStreamingStatus
-    | dropStreaming
-    | alterStreamingRule
-    ) SEMI_? EOF
-    ;
+/**
+ * Show streaming rule statement test case.
+ */
+public final class ShowStreamingRuleStatementTestCase extends 
SQLParserTestCase {
+}
diff --git a/test/it/parser/src/main/resources/case/ral/cdc.xml 
b/test/it/parser/src/main/resources/case/ral/cdc.xml
index 534077055b8..35df1528745 100644
--- a/test/it/parser/src/main/resources/case/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/case/ral/cdc.xml
@@ -17,6 +17,34 @@
   -->
 
 <sql-parser-test-cases>
+    <show-streaming-rule sql-case-id="show-streaming-rule" />
+
+    <alter-streaming-rule sql-case-id="alter-streaming-rule">
+        <job-type-name>STREAMING</job-type-name>
+        <rule>
+            <read>
+                <worker-thread>20</worker-thread>
+                <batch-size>1000</batch-size>
+                <sharding-size>10000000</sharding-size>
+                <rate-limiter algorithm-name="QPS">
+                    <properties>
+                        <property key="qps" value="500" />
+                    </properties>
+                </rate-limiter>
+            </read>
+            <write>
+                <worker-thread>20</worker-thread>
+                <batch-size>2000</batch-size>
+                <rate-limiter algorithm-name="TPS">
+                    <property key="tps" value="2000" />
+                </rate-limiter>
+            </write>
+            <stream-channel algorithm-name="MEMORY">
+                <property key="block-queue-size" value="100" />
+            </stream-channel>
+        </rule>
+    </alter-streaming-rule>
+
     <show-streaming-list sql-case-id="show-streaming-list" />
     
     <show-streaming-status sql-case-id="show-streaming-status">
diff --git a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml 
b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
index 282f03b2629..91a7680418e 100644
--- a/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
+++ b/test/it/parser/src/main/resources/sql/supported/ral/cdc.xml
@@ -17,6 +17,8 @@
   -->
 
 <sql-cases>
+    <sql-case id="show-streaming-rule" value="SHOW STREAMING RULE;" 
db-types="ShardingSphere"/>
+    <sql-case id="alter-streaming-rule" value="ALTER STREAMING RULE 
(READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER 
(TYPE(NAME='QPS',PROPERTIES('qps'='500')))),WRITE(WORKER_THREAD=20,BATCH_SIZE=2000,RATE_LIMITER
 (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),STREAM_CHANNEL 
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='100'))));" 
db-types="ShardingSphere" />
     <sql-case id="show-streaming-list" value="SHOW STREAMING LIST;" 
db-types="ShardingSphere"/>
     <sql-case id="show-streaming-status" value="SHOW STREAMING STATUS 123;" 
db-types="ShardingSphere"/>
     <sql-case id="drop-streaming" value="DROP STREAMING 123;" 
db-types="ShardingSphere"/>


Reply via email to