This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 56f8ee0f2e1 Improve CDCE2EIT and CreateTableSQLGeneratorIT for 
openGauss nightly E2E (#37822)
56f8ee0f2e1 is described below

commit 56f8ee0f2e11ee8bc2c22eedabaecb280a2ec898
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 23 18:54:49 2026 +0800

    Improve CDCE2EIT and CreateTableSQLGeneratorIT for openGauss nightly E2E 
(#37822)
    
    * Remove schema name in openGauss create-table-sql-generator.xml
    
    * Add log, used to verify `date` type return `datetime` type from 
pg_get_tabledef
    
    * Add log for CDC; Increase streamData timeout on client
---
 .../pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java    | 3 +++
 .../org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java | 2 ++
 .../data/pipeline/cdc/client/parameter/StartStreamingParameter.java   | 2 ++
 .../shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java   | 2 ++
 .../shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java | 4 ++++
 .../test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java               | 2 +-
 .../create-table-generator/opengauss/create-table-sql-generator.xml   | 4 ++--
 7 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index c06d11f5d95..bb6a72ec797 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
@@ -35,6 +36,7 @@ import java.util.stream.Collectors;
 /**
  * Pipeline SQL builder of openGauss.
  */
+@Slf4j
 public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuilder {
     
     @Override
@@ -78,6 +80,7 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
             if (resultSet.next()) {
                 // TODO use ";" to split is not always correct if return 
value's comments contains ";"
                 String tableDefinition = 
resultSet.getString("pg_get_tabledef");
+                log.info("Generate create table definition for {}.{}: {}", 
schemaName, tableName, tableDefinition);
                 return Arrays.asList(tableDefinition.split(";"));
             }
         }
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
index b18fce5aa22..9aa819e48ac 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java
@@ -56,6 +56,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 /**
@@ -145,6 +146,7 @@ public final class CDCClient implements AutoCloseable {
         ResponseFuture responseFuture = new ResponseFuture(requestId, 
Type.STREAM_DATA);
         connectionContext.getResponseFutureMap().put(requestId, 
responseFuture);
         channel.writeAndFlush(request);
+        log.info("Sending start streaming request, param: {}, timeout: {} s", 
parameter, TimeUnit.MILLISECONDS.toSeconds(config.getTimeoutMillis()));
         String result = 
responseFuture.waitResponseResult(config.getTimeoutMillis(), 
connectionContext).toString();
         log.info("Start streaming success, streaming id: {}", result);
         return result;
diff --git 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
index 1a7abd78a3f..0ff2d4a2cd3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
+++ 
b/kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/parameter/StartStreamingParameter.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 
 import java.util.Set;
@@ -28,6 +29,7 @@ import java.util.Set;
  */
 @RequiredArgsConstructor
 @Getter
+@ToString
 public final class StartStreamingParameter {
     
     private final String database;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index 9f7ca7a8788..49bfd12beb3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -121,7 +121,9 @@ public final class CDCBackendHandler {
         StreamDataParameter parameter = new 
StreamDataParameter(requestBody.getDatabase(), new 
ArrayList<>(schemaTableNames), requestBody.getFull(), tableAndDataNodesMap, 
isDecodeWithTransaction);
         String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new 
Properties());
         connectionContext.setJobId(jobId);
+        log.info("Stream data, jobId={}, database={}, schemaTableNames={}, 
full={}", jobId, requestBody.getDatabase(), schemaTableNames, 
requestBody.getFull());
         startStreaming(jobId, connectionContext, channel);
+        log.info("Stream data, started, jobId={}", jobId);
         return CDCResponseUtils.succeed(requestId, 
ResponseCase.STREAM_DATA_RESULT, 
StreamDataResult.newBuilder().setStreamingId(jobId).build());
     }
     
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
index f6f933f8eee..206e1a587c4 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/CDCChannelInboundHandler.java
@@ -73,6 +73,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     
     @Override
     public void channelActive(final ChannelHandlerContext ctx) {
+        log.info("channel active: {}", ctx.channel().remoteAddress());
         CDCResponse response = 
CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
                 .setStatus(Status.SUCCEED).build();
         ctx.writeAndFlush(response);
@@ -80,6 +81,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) {
+        log.info("channel inactive: {}", ctx.channel().remoteAddress());
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         if (null != connectionContext && null != connectionContext.getJobId()) 
{
             backendHandler.stopStreaming(connectionContext.getJobId(), 
ctx.channel().id());
@@ -109,6 +111,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         CDCConnectionContext connectionContext = 
ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
         CDCRequest request = (CDCRequest) msg;
+        log.info("channel read: {}, request type: {}, request id: {}", 
ctx.channel().remoteAddress(), request.getType(), request.getRequestId());
         if (null == connectionContext || request.hasLoginRequestBody()) {
             processLogin(ctx, request);
             return;
@@ -146,6 +149,7 @@ public final class CDCChannelInboundHandler extends 
ChannelInboundHandlerAdapter
                 () -> new CDCExceptionWrapper(request.getRequestId(), new 
CDCLoginFailedException()));
         ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new 
CDCConnectionContext(user));
         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
+        log.info("Process login success, request id: {}", 
request.getRequestId());
     }
     
     private void checkPrivileges(final String requestId, final Grantee 
grantee, final String currentDatabase) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index e8840ad589a..ee280ce29d9 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -174,7 +174,7 @@ class CDCE2EIT {
     
     private CDCClient buildCDCClientAndStart(final PipelineDataSource 
dataSource, final PipelineContainerComposer containerComposer) {
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
-        CDCClient result = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
+        CDCClient result = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 10000));
         result.connect(recordConsumer, new 
RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> 
log.error("Server error: {}", serverErrorResult.getErrorMessage()));
         result.login(new CDCLoginParameter(ProxyContainerConstants.USER, 
ProxyContainerConstants.PASSWORD));
         // TODO add full=false test case later
diff --git 
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
 
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
index 4259a9fea1b..44e5826baa1 100644
--- 
a/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
+++ 
b/test/e2e/operation/pipeline/src/test/resources/env/scenario/create-table-generator/opengauss/create-table-sql-generator.xml
@@ -35,7 +35,7 @@
                 WITH (orientation=row, compression=no)
             </sql>
             <sql>
-                ALTER TABLE public.t_order ADD CONSTRAINT t_order_pkey PRIMARY 
KEY (order_id)
+                ALTER TABLE t_order ADD CONSTRAINT t_order_pkey PRIMARY KEY 
(order_id)
             </sql>
         </output>
     </create-table-generator-assertion>
@@ -61,7 +61,7 @@
                 WITH (orientation=row, compression=no)
             </sql>
             <sql>
-                ALTER TABLE public.t_order_item ADD CONSTRAINT 
t_order_item_pkey PRIMARY KEY (item_id)
+                ALTER TABLE t_order_item ADD CONSTRAINT t_order_item_pkey 
PRIMARY KEY (item_id)
             </sql>
         </output>
     </create-table-generator-assertion>

Reply via email to