sandynz commented on code in PR #28953:
URL: https://github.com/apache/shardingsphere/pull/28953#discussion_r1385860739


##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java:
##########
@@ -32,53 +30,45 @@
 /**
  * Retry streaming exception handler.
  */
-@RequiredArgsConstructor
 @Slf4j
 public class RetryStreamingExceptionHandler implements ExceptionHandler {
     
-    private final AtomicInteger retryCount = new AtomicInteger(0);
+    private final CDCClient cdcClient;
+    
+    private final AtomicInteger maxRetryTimes = new AtomicInteger(0);
     
     private final int retryIntervalMills;
     
-    private CDCClient cdcClient;
+    private final int retryTimes;
     
-    public RetryStreamingExceptionHandler(final int retryCount, final int 
retryIntervalMills) {
-        this.retryCount.set(retryCount);
-        this.retryIntervalMills = retryIntervalMills;
-    }
-    
-    @Override
-    public void setCDCClient(final CDCClient cdcClient) {
+    public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int 
maxRetryTimes, final int retryIntervalMills) {
         this.cdcClient = cdcClient;
+        this.maxRetryTimes.set(maxRetryTimes);
+        this.retryIntervalMills = retryIntervalMills;
+        retryTimes = 0;
     }
     
     @Override
-    public void handleServerException(final ChannelHandlerContext ctx, final 
ServerErrorResult result) {
-        log.error("Server error, code: {}, message: {}", 
result.getErrorCode(), result.getErrorMessage());
-        reconnect(ctx);
-    }
-    
-    @Override
-    public void handleSocketException(final ChannelHandlerContext ctx, final 
Throwable throwable) {
+    public void handleException(final ChannelHandlerContext ctx, final 
Throwable throwable) {
         log.error("Socket error: {}", throwable.getMessage());

Review Comment:
   logging could be improved



##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java:
##########
@@ -32,53 +30,45 @@
 /**
  * Retry streaming exception handler.
  */
-@RequiredArgsConstructor
 @Slf4j
 public class RetryStreamingExceptionHandler implements ExceptionHandler {
     
-    private final AtomicInteger retryCount = new AtomicInteger(0);
+    private final CDCClient cdcClient;
+    
+    private final AtomicInteger maxRetryTimes = new AtomicInteger(0);
     
     private final int retryIntervalMills;
     
-    private CDCClient cdcClient;
+    private final int retryTimes;
     
-    public RetryStreamingExceptionHandler(final int retryCount, final int 
retryIntervalMills) {
-        this.retryCount.set(retryCount);
-        this.retryIntervalMills = retryIntervalMills;
-    }
-    
-    @Override
-    public void setCDCClient(final CDCClient cdcClient) {
+    public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int 
maxRetryTimes, final int retryIntervalMills) {
         this.cdcClient = cdcClient;
+        this.maxRetryTimes.set(maxRetryTimes);
+        this.retryIntervalMills = retryIntervalMills;
+        retryTimes = 0;
     }

Review Comment:
   Verify `retryTimes` logic, it's final and intiial value is `0`



##########
test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java:
##########
@@ -169,10 +169,10 @@ private DataSource createStandardDataSource(final 
PipelineContainerComposer cont
     private void startCDCClient(final PipelineContainerComposer 
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
         DataSource dataSource = createStandardDataSource(containerComposer, 
PipelineContainerComposer.DS_4);
         DataSourceRecordConsumer recordConsumer = new 
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
-        CDCClientConfiguration cdcConfig = new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new 
LoggerExceptionHandler());
         String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : 
"";
-        CDCClient cdcClient = new CDCClient(cdcConfig);
-        cdcClient.connect();
+        CDCClient cdcClient = new CDCClient(new 
CDCClientConfiguration("localhost", 
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
+        LoggerExceptionErrorHandler handler = new 
LoggerExceptionErrorHandler();
+        cdcClient.connect(recordConsumer, handler, handler);

Review Comment:
   Could we improve `handler, handler` parameter with closure?



##########
kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java:
##########
@@ -41,14 +42,14 @@ public static void main(final String[] args) {
         // Pay attention to the time zone, to avoid the problem of incorrect 
time zone, it is best to ensure that the time zone of the program is consistent 
with the time zone of the database server
         // TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
         String address = "127.0.0.1";
-        CDCClientConfiguration clientConfig = new 
CDCClientConfiguration(address, 33071, records -> log.info("records: {}", 
records), new RetryStreamingExceptionHandler(3, 5000));
         int reconnectCount = 0;
         int maxReconnectCount = 5;
         while (reconnectCount < maxReconnectCount) {
-            try (CDCClient cdcClient = new CDCClient(clientConfig)) {
-                cdcClient.connect();
+            try (CDCClient cdcClient = new CDCClient(new 
CDCClientConfiguration(address, 33071, 5000))) {
+                LoggerExceptionErrorHandler loggerExceptionErrorHandler = new 
LoggerExceptionErrorHandler();
+                cdcClient.connect(records -> log.info("records: {}", records), 
new RetryStreamingExceptionHandler(cdcClient, 5, 5000), 
loggerExceptionErrorHandler);

Review Comment:
   `reconnectCount` is different with `Retry...Handler`, `catch (final 
Exception ex)` could be improved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to