This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e75d18a0ab4 MySQLBinlogClient compatible with async exception (#37631)
e75d18a0ab4 is described below

commit e75d18a0ab407f6869cd3af2225f07c368210b31
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Jan 4 13:30:19 2026 +0800

    MySQLBinlogClient compatible with async exception (#37631)
    
    * MySQLBinlogClient compatible with async exception
    
    * Update RELEASE-NOTES.md
---
 RELEASE-NOTES.md                                   |  1 +
 .../incremental/client/MySQLBinlogClient.java      | 54 +++++++++++--
 .../incremental/dumper/MySQLIncrementalDumper.java |  2 +-
 .../incremental/client/MySQLBinlogClientTest.java  | 92 +++++++---------------
 .../dumper/MySQLIncrementalDumperTest.java         |  2 +-
 5 files changed, 78 insertions(+), 73 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 7c6407f919c..044e0587291 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -120,6 +120,7 @@
 1. Pipeline: Fix migration might skip some records on big table after job 
restarting - [#36878](https://github.com/apache/shardingsphere/pull/36878)
 1. Pipeline: Fix unsigned number column value type inconsistent in inventory 
and incremental - [#37280](https://github.com/apache/shardingsphere/pull/37280)
 1. Pipeline: Fix PostgreSQL migration create table SQL generation failure 
caused by locale-formatted sequence values - 
[#28360](https://github.com/apache/shardingsphere/issues/28360)
+1. Pipeline: MySQLBinlogClient compatible with async exception - 
[#37631](https://github.com/apache/shardingsphere/issues/37631)
 1. DistSQL: Fix load single table with specific schema - 
[#37535](https://github.com/apache/shardingsphere/pull/37535)
 1. Transaction: Fix XA data source enlist failure caused connection leaks - 
[37593](https://github.com/apache/shardingsphere/pull/37593)
 
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 08a5b30aed8..980f4b44cad 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -64,6 +64,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -89,6 +90,10 @@ public final class MySQLBinlogClient {
     
     private volatile boolean running = true;
     
+    private volatile boolean ready;
+    
+    private final AtomicInteger continuousFailureCount = new AtomicInteger(0);
+    
     /**
      * Connect to MySQL.
      */
@@ -114,7 +119,6 @@ public final class MySQLBinlogClient {
                     }
                 }).connect(connectInfo.getHost(), 
connectInfo.getPort()).channel();
         serverVersion = 
waitExpectedResponse(MySQLServerVersion.class).orElse(null);
-        running = true;
     }
     
     /**
@@ -176,6 +180,7 @@ public final class MySQLBinlogClient {
         configureHeartbeat();
         registerSlave();
         dumpBinlog(binlogFileName, binlogPosition, queryChecksumLength());
+        ready = true;
         log.info("subscribe binlog file: {}, position: {}", binlogFileName, 
binlogPosition);
     }
     
@@ -234,9 +239,11 @@ public final class MySQLBinlogClient {
      * Poll binlog event.
      *
      * @return binlog event
+     * @throws RuntimeException if MySQL binlog client is not running
      */
     public synchronized List<MySQLBaseBinlogEvent> poll() {
-        if (!running) {
+        ShardingSpherePreconditions.checkState(running, () -> new 
RuntimeException("MySQL binlog client is not running"));
+        if (!ready) {
             return Collections.emptyList();
         }
         try {
@@ -273,13 +280,17 @@ public final class MySQLBinlogClient {
     /**
      * Close netty channel.
      *
+     * @param terminate whether to terminate or not
      * @return channel future
      */
-    public Optional<ChannelFuture> closeChannel() {
+    public Optional<ChannelFuture> closeChannel(final boolean terminate) {
+        ready = false;
+        if (terminate) {
+            running = false;
+        }
         if (null == channel || !channel.isOpen()) {
             return Optional.empty();
         }
-        running = false;
         ChannelFuture future = channel.close();
         if (null != eventLoopGroup) {
             eventLoopGroup.shutdownGracefully();
@@ -321,6 +332,10 @@ public final class MySQLBinlogClient {
             if (!running) {
                 return;
             }
+            if (continuousFailureCount.get() > 0) {
+                log.info("Failure count reset to 0");
+                continuousFailureCount.set(0);
+            }
             if (msg instanceof List) {
                 List<MySQLBaseBinlogEvent> records = 
(List<MySQLBaseBinlogEvent>) msg;
                 if (records.isEmpty()) {
@@ -349,6 +364,8 @@ public final class MySQLBinlogClient {
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
             log.error("MySQLBinlogEventHandler protocol resolution error, 
channel: {}, lastBinlogEvent: {}", ctx.channel(), 
JsonUtils.toJsonString(lastBinlogEvent.get()), cause);
+            continuousFailureCount.incrementAndGet();
+            closeChannel(false);
         }
         
         private void tryReconnect() {
@@ -359,16 +376,37 @@ public final class MySQLBinlogClient {
         
         @SneakyThrows(InterruptedException.class)
         private synchronized void reconnect() {
-            for (int reconnectTimes = 0; reconnectTimes < 3; reconnectTimes++) 
{
+            log.info("Reconnect failure count: {}", 
continuousFailureCount.get());
+            if (continuousFailureCount.get() >= 5) {
+                log.error("MySQL binlog client failed permanently, 
lastBinlogEvent: {}", JsonUtils.toJsonString(lastBinlogEvent.get()));
+                closeChannel(true);
+                return;
+            }
+            wait(2000L * (continuousFailureCount.get() + 1));
+            try {
+                reconnectWithRetry();
+                // CHECKSTYLE:OFF
+            } catch (final RuntimeException ex) {
+                // CHECKSTYLE:ON
+                log.error("Reconnect failed permanently, lastBinlogEvent: {}", 
JsonUtils.toJsonString(lastBinlogEvent.get()), ex);
+                closeChannel(true);
+                return;
+            }
+            subscribe(lastBinlogEvent.get().getFileName(), 
lastBinlogEvent.get().getPosition());
+        }
+        
+        private void reconnectWithRetry() throws InterruptedException {
+            for (int reconnectTimes = 1; true; reconnectTimes++) {
                 try {
                     connect();
-                    log.info("Reconnect times {}", reconnectTimes);
-                    subscribe(lastBinlogEvent.get().getFileName(), 
lastBinlogEvent.get().getPosition());
                     break;
                     // CHECKSTYLE:OFF
                 } catch (final RuntimeException ex) {
                     // CHECKSTYLE:ON
-                    log.error("Reconnect failed, reconnect times: {}, 
lastBinlogEvent: {}", reconnectTimes, 
JsonUtils.toJsonString(lastBinlogEvent.get()), ex);
+                    closeChannel(false);
+                    if (reconnectTimes > 3) {
+                        throw ex;
+                    }
                     wait(1000L << reconnectTimes);
                 }
             }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index 525de8fc997..de5f76d79b1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -200,6 +200,6 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     
     @Override
     protected void doStop() {
-        client.closeChannel();
+        client.closeChannel(true);
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
index ac76e6eeb8a..a4752d3acb1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
@@ -32,9 +32,9 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
 import 
org.apache.shardingsphere.database.protocol.mysql.constant.MySQLConstants;
 import 
org.apache.shardingsphere.database.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
@@ -63,10 +63,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -79,7 +77,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.CALLS_REAL_METHODS;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -154,7 +151,7 @@ class MySQLBinlogClientTest {
         initializer.get().channelRegistered(context);
         MySQLServerVersion actual = (MySQLServerVersion) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("serverVersion"),
 client);
         assertThat(actual, is(expected));
-        client.closeChannel();
+        client.closeChannel(true);
     }
     
     @Test
@@ -306,30 +303,46 @@ class MySQLBinlogClientTest {
         assertThat(getChecksumLength(decoder), is(0));
     }
     
-    @SuppressWarnings("unchecked")
     @Test
-    void assertPollBranches() throws InterruptedException, 
ReflectiveOperationException {
+    void assertPollOnNotRunning() throws ReflectiveOperationException {
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
         setRunning(false);
+        assertThrows(RuntimeException.class, () -> client.poll());
+    }
+    
+    @Test
+    void assertPollOnNotReady() throws ReflectiveOperationException {
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
+        setRunning(true);
+        setReady(false);
         assertThat(client.poll(), is(Collections.emptyList()));
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    void assertPollBranches() throws InterruptedException, 
ReflectiveOperationException {
         setRunning(true);
+        setReady(false);
+        assertThat(client.poll(), is(Collections.emptyList()));
+        setReady(true);
         assertThat(client.poll(), is(Collections.emptyList()));
         List<MySQLBaseBinlogEvent> events = Collections.singletonList(new 
PlaceholderBinlogEvent("binlog", 4L, 1L));
         ((ArrayBlockingQueue<List<MySQLBaseBinlogEvent>>) 
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("blockingEventQueue"),
 client)).put(events);
         assertThat(client.poll(), is(events));
-        setRunning(true);
+        setReady(true);
         Thread.currentThread().interrupt();
         assertThat(client.poll(), is(Collections.emptyList()));
     }
     
     @Test
     void assertCloseChannelWhenChannelUnavailable() {
-        assertFalse(client.closeChannel().isPresent());
+        assertFalse(client.closeChannel(true).isPresent());
     }
     
     @Test
     void assertCloseChannelWithoutEventLoopGroup() throws Exception {
         
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
 client, channel);
-        Optional<ChannelFuture> actual = client.closeChannel();
+        Optional<ChannelFuture> actual = client.closeChannel(true);
         assertTrue(actual.isPresent());
         verify(channel).close();
     }
@@ -374,59 +387,7 @@ class MySQLBinlogClientTest {
         setResponseCallback(null);
         handler.channelRead(mock(ChannelHandlerContext.class), new Object());
         handler.exceptionCaught(mock(ChannelHandlerContext.class), new 
RuntimeException("ex"));
-        client.closeChannel();
-    }
-    
-    @Test
-    void assertMySQLBinlogEventHandlerBranches() throws Exception {
-        client = createClientMock();
-        prepareClientChannel();
-        setServerVersion("5.6.0");
-        doReturn(true).when(client).execute(anyString());
-        
doReturn(createResultSet("CRC32")).when(client).executeQuery(anyString());
-        
when(channel.writeAndFlush(any(MySQLComRegisterSlaveCommandPacket.class))).thenAnswer(invocation
 -> {
-            Promise<Object> callback = new 
DefaultPromise<>(eventLoopGroup.next());
-            callback.setSuccess(new MySQLOKPacket(0));
-            setResponseCallback(callback);
-            return null;
-        });
-        doAnswer(invocation -> 
null).when(channel).writeAndFlush(any(MySQLComBinlogDumpCommandPacket.class));
-        client.subscribe("binlog-000003", 12L);
-        ArgumentCaptor<ChannelHandler> captor = 
ArgumentCaptor.forClass(ChannelHandler.class);
-        verify(pipeline, times(2)).addLast(captor.capture());
-        ChannelInboundHandlerAdapter handler = captor.getAllValues().stream()
-                .filter(each -> 
"MySQLBinlogEventHandler".equals(each.getClass().getSimpleName()))
-                .map(each -> (ChannelInboundHandlerAdapter) 
each).findFirst().orElseThrow(IllegalStateException::new);
-        setRunning(false);
-        handler.channelRead(mock(ChannelHandlerContext.class), new Object());
-        setRunning(true);
-        handler.channelRead(mock(ChannelHandlerContext.class), 
Collections.emptyList());
-        PlaceholderBinlogEvent firstEvent = new 
PlaceholderBinlogEvent("binlog-000003", 12L, 1L);
-        PlaceholderBinlogEvent secondEvent = new 
PlaceholderBinlogEvent("binlog-000003", 20L, 2L);
-        handler.channelRead(mock(ChannelHandlerContext.class), 
Arrays.asList(firstEvent, secondEvent));
-        handler.channelRead(mock(ChannelHandlerContext.class), new 
PlaceholderBinlogEvent("binlog-000003", 30L, 3L));
-        handler.exceptionCaught(mock(ChannelHandlerContext.class), new 
RuntimeException("binlog"));
-        setRunning(false);
-        handler.channelInactive(mock(ChannelHandlerContext.class));
-        setRunning(true);
-        CountDownLatch latch = new CountDownLatch(1);
-        AtomicInteger connectAttempts = new AtomicInteger();
-        doAnswer(invocation -> {
-            if (connectAttempts.getAndIncrement() == 0) {
-                throw new RuntimeException("fail");
-            }
-            return null;
-        }).when(client).connect();
-        doAnswer(invocation -> {
-            latch.countDown();
-            return null;
-        }).when(client).subscribe(anyString(), anyLong());
-        handler.channelInactive(mock(ChannelHandlerContext.class));
-        handler.channelInactive(mock(ChannelHandlerContext.class));
-        assertTrue(latch.await(3L, TimeUnit.SECONDS));
-        AtomicBoolean reconnectRequested = (AtomicBoolean) 
Plugins.getMemberAccessor().get(handler.getClass().getDeclaredField("reconnectRequested"),
 handler);
-        reconnectRequested.set(true);
-        handler.channelInactive(mock(ChannelHandlerContext.class));
+        client.closeChannel(true);
     }
     
     private InternalResultSet createResultSet(final String checksum) {
@@ -460,6 +421,11 @@ class MySQLBinlogClientTest {
         
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("running"),
 client, value);
     }
     
+    @SneakyThrows(ReflectiveOperationException.class)
+    private void setReady(final boolean value) {
+        
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("ready"),
 client, value);
+    }
+    
     @SneakyThrows(ReflectiveOperationException.class)
     private int getChecksumLength(final MySQLBinlogEventPacketDecoder decoder) 
{
         MySQLBinlogContext binlogContext = (MySQLBinlogContext) 
Plugins.getMemberAccessor().get(MySQLBinlogEventPacketDecoder.class.getDeclaredField("binlogContext"),
 decoder);
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 19968fe9874..249a2ad2978 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -164,7 +164,7 @@ class MySQLIncrementalDumperTest {
         dumperThread.join(1000L);
         verify(client).connect();
         verify(client).subscribe("binlog-000001", 4L);
-        verify(client, timeout(1000L)).closeChannel();
+        verify(client, timeout(1000L)).closeChannel(true);
         ArgumentCaptor<List<Record>> captor = 
ArgumentCaptor.forClass(List.class);
         verify(channel, timeout(1000L)).push(captor.capture());
         List<Record> pushed = captor.getValue();

Reply via email to