sandynz commented on code in PR #28953:
URL: https://github.com/apache/shardingsphere/pull/28953#discussion_r1385860739
##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java:
##########
@@ -32,53 +30,45 @@
/**
* Retry streaming exception handler.
*/
-@RequiredArgsConstructor
@Slf4j
public class RetryStreamingExceptionHandler implements ExceptionHandler {
- private final AtomicInteger retryCount = new AtomicInteger(0);
+ private final CDCClient cdcClient;
+
+ private final AtomicInteger maxRetryTimes = new AtomicInteger(0);
private final int retryIntervalMills;
- private CDCClient cdcClient;
+ private final int retryTimes;
- public RetryStreamingExceptionHandler(final int retryCount, final int
retryIntervalMills) {
- this.retryCount.set(retryCount);
- this.retryIntervalMills = retryIntervalMills;
- }
-
- @Override
- public void setCDCClient(final CDCClient cdcClient) {
+ public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int
maxRetryTimes, final int retryIntervalMills) {
this.cdcClient = cdcClient;
+ this.maxRetryTimes.set(maxRetryTimes);
+ this.retryIntervalMills = retryIntervalMills;
+ retryTimes = 0;
}
@Override
- public void handleServerException(final ChannelHandlerContext ctx, final
ServerErrorResult result) {
- log.error("Server error, code: {}, message: {}",
result.getErrorCode(), result.getErrorMessage());
- reconnect(ctx);
- }
-
- @Override
- public void handleSocketException(final ChannelHandlerContext ctx, final
Throwable throwable) {
+ public void handleException(final ChannelHandlerContext ctx, final
Throwable throwable) {
log.error("Socket error: {}", throwable.getMessage());
Review Comment:
logging could be improved
##########
kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/handler/RetryStreamingExceptionHandler.java:
##########
@@ -32,53 +30,45 @@
/**
* Retry streaming exception handler.
*/
-@RequiredArgsConstructor
@Slf4j
public class RetryStreamingExceptionHandler implements ExceptionHandler {
- private final AtomicInteger retryCount = new AtomicInteger(0);
+ private final CDCClient cdcClient;
+
+ private final AtomicInteger maxRetryTimes = new AtomicInteger(0);
private final int retryIntervalMills;
- private CDCClient cdcClient;
+ private final int retryTimes;
- public RetryStreamingExceptionHandler(final int retryCount, final int
retryIntervalMills) {
- this.retryCount.set(retryCount);
- this.retryIntervalMills = retryIntervalMills;
- }
-
- @Override
- public void setCDCClient(final CDCClient cdcClient) {
+ public RetryStreamingExceptionHandler(final CDCClient cdcClient, final int
maxRetryTimes, final int retryIntervalMills) {
this.cdcClient = cdcClient;
+ this.maxRetryTimes.set(maxRetryTimes);
+ this.retryIntervalMills = retryIntervalMills;
+ retryTimes = 0;
}
Review Comment:
Verify `retryTimes` logic, it's final and intiial value is `0`
##########
test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java:
##########
@@ -169,10 +169,10 @@ private DataSource createStandardDataSource(final
PipelineContainerComposer cont
private void startCDCClient(final PipelineContainerComposer
containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
DataSource dataSource = createStandardDataSource(containerComposer,
PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
- CDCClientConfiguration cdcConfig = new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), recordConsumer, new
LoggerExceptionHandler());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" :
"";
- CDCClient cdcClient = new CDCClient(cdcConfig);
- cdcClient.connect();
+ CDCClient cdcClient = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
+ LoggerExceptionErrorHandler handler = new
LoggerExceptionErrorHandler();
+ cdcClient.connect(recordConsumer, handler, handler);
Review Comment:
Could we improve `handler, handler` parameter with closure?
##########
kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java:
##########
@@ -41,14 +42,14 @@ public static void main(final String[] args) {
// Pay attention to the time zone, to avoid the problem of incorrect
time zone, it is best to ensure that the time zone of the program is consistent
with the time zone of the database server
// TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
String address = "127.0.0.1";
- CDCClientConfiguration clientConfig = new
CDCClientConfiguration(address, 33071, records -> log.info("records: {}",
records), new RetryStreamingExceptionHandler(3, 5000));
int reconnectCount = 0;
int maxReconnectCount = 5;
while (reconnectCount < maxReconnectCount) {
- try (CDCClient cdcClient = new CDCClient(clientConfig)) {
- cdcClient.connect();
+ try (CDCClient cdcClient = new CDCClient(new
CDCClientConfiguration(address, 33071, 5000))) {
+ LoggerExceptionErrorHandler loggerExceptionErrorHandler = new
LoggerExceptionErrorHandler();
+ cdcClient.connect(records -> log.info("records: {}", records),
new RetryStreamingExceptionHandler(cdcClient, 5, 5000),
loggerExceptionErrorHandler);
Review Comment:
`reconnectCount` is different with `Retry...Handler`, `catch (final
Exception ex)` could be improved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]