This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e75d18a0ab4 MySQLBinlogClient compatible with async exception (#37631)
e75d18a0ab4 is described below
commit e75d18a0ab407f6869cd3af2225f07c368210b31
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Jan 4 13:30:19 2026 +0800
MySQLBinlogClient compatible with async exception (#37631)
* MySQLBinlogClient compatible with async exception
* Update RELEASE-NOTES.md
---
RELEASE-NOTES.md | 1 +
.../incremental/client/MySQLBinlogClient.java | 54 +++++++++++--
.../incremental/dumper/MySQLIncrementalDumper.java | 2 +-
.../incremental/client/MySQLBinlogClientTest.java | 92 +++++++---------------
.../dumper/MySQLIncrementalDumperTest.java | 2 +-
5 files changed, 78 insertions(+), 73 deletions(-)
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 7c6407f919c..044e0587291 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -120,6 +120,7 @@
1. Pipeline: Fix migration might skip some records on big table after job
restarting - [#36878](https://github.com/apache/shardingsphere/pull/36878)
1. Pipeline: Fix unsigned number column value type inconsistent in inventory
and incremental - [#37280](https://github.com/apache/shardingsphere/pull/37280)
1. Pipeline: Fix PostgreSQL migration create table SQL generation failure
caused by locale-formatted sequence values -
[#28360](https://github.com/apache/shardingsphere/issues/28360)
+1. Pipeline: MySQLBinlogClient compatible with async exception -
[#37631](https://github.com/apache/shardingsphere/issues/37631)
1. DistSQL: Fix load single table with specific schema -
[#37535](https://github.com/apache/shardingsphere/pull/37535)
1. Transaction: Fix XA data source enlist failure caused connection leaks -
[37593](https://github.com/apache/shardingsphere/pull/37593)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
index 08a5b30aed8..980f4b44cad 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java
@@ -64,6 +64,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -89,6 +90,10 @@ public final class MySQLBinlogClient {
private volatile boolean running = true;
+ private volatile boolean ready;
+
+ private final AtomicInteger continuousFailureCount = new AtomicInteger(0);
+
/**
* Connect to MySQL.
*/
@@ -114,7 +119,6 @@ public final class MySQLBinlogClient {
}
}).connect(connectInfo.getHost(),
connectInfo.getPort()).channel();
serverVersion =
waitExpectedResponse(MySQLServerVersion.class).orElse(null);
- running = true;
}
/**
@@ -176,6 +180,7 @@ public final class MySQLBinlogClient {
configureHeartbeat();
registerSlave();
dumpBinlog(binlogFileName, binlogPosition, queryChecksumLength());
+ ready = true;
log.info("subscribe binlog file: {}, position: {}", binlogFileName,
binlogPosition);
}
@@ -234,9 +239,11 @@ public final class MySQLBinlogClient {
* Poll binlog event.
*
* @return binlog event
+ * @throws RuntimeException if MySQL binlog client is not running
*/
public synchronized List<MySQLBaseBinlogEvent> poll() {
- if (!running) {
+ ShardingSpherePreconditions.checkState(running, () -> new
RuntimeException("MySQL binlog client is not running"));
+ if (!ready) {
return Collections.emptyList();
}
try {
@@ -273,13 +280,17 @@ public final class MySQLBinlogClient {
/**
* Close netty channel.
*
+ * @param terminate whether to terminate or not
* @return channel future
*/
- public Optional<ChannelFuture> closeChannel() {
+ public Optional<ChannelFuture> closeChannel(final boolean terminate) {
+ ready = false;
+ if (terminate) {
+ running = false;
+ }
if (null == channel || !channel.isOpen()) {
return Optional.empty();
}
- running = false;
ChannelFuture future = channel.close();
if (null != eventLoopGroup) {
eventLoopGroup.shutdownGracefully();
@@ -321,6 +332,10 @@ public final class MySQLBinlogClient {
if (!running) {
return;
}
+ if (continuousFailureCount.get() > 0) {
+ log.info("Failure count reset to 0");
+ continuousFailureCount.set(0);
+ }
if (msg instanceof List) {
List<MySQLBaseBinlogEvent> records =
(List<MySQLBaseBinlogEvent>) msg;
if (records.isEmpty()) {
@@ -349,6 +364,8 @@ public final class MySQLBinlogClient {
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final
Throwable cause) {
log.error("MySQLBinlogEventHandler protocol resolution error,
channel: {}, lastBinlogEvent: {}", ctx.channel(),
JsonUtils.toJsonString(lastBinlogEvent.get()), cause);
+ continuousFailureCount.incrementAndGet();
+ closeChannel(false);
}
private void tryReconnect() {
@@ -359,16 +376,37 @@ public final class MySQLBinlogClient {
@SneakyThrows(InterruptedException.class)
private synchronized void reconnect() {
- for (int reconnectTimes = 0; reconnectTimes < 3; reconnectTimes++)
{
+ log.info("Reconnect failure count: {}",
continuousFailureCount.get());
+ if (continuousFailureCount.get() >= 5) {
+ log.error("MySQL binlog client failed permanently,
lastBinlogEvent: {}", JsonUtils.toJsonString(lastBinlogEvent.get()));
+ closeChannel(true);
+ return;
+ }
+ wait(2000L * (continuousFailureCount.get() + 1));
+ try {
+ reconnectWithRetry();
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.error("Reconnect failed permanently, lastBinlogEvent: {}",
JsonUtils.toJsonString(lastBinlogEvent.get()), ex);
+ closeChannel(true);
+ return;
+ }
+ subscribe(lastBinlogEvent.get().getFileName(),
lastBinlogEvent.get().getPosition());
+ }
+
+ private void reconnectWithRetry() throws InterruptedException {
+ for (int reconnectTimes = 1; true; reconnectTimes++) {
try {
connect();
- log.info("Reconnect times {}", reconnectTimes);
- subscribe(lastBinlogEvent.get().getFileName(),
lastBinlogEvent.get().getPosition());
break;
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- log.error("Reconnect failed, reconnect times: {},
lastBinlogEvent: {}", reconnectTimes,
JsonUtils.toJsonString(lastBinlogEvent.get()), ex);
+ closeChannel(false);
+ if (reconnectTimes > 3) {
+ throw ex;
+ }
wait(1000L << reconnectTimes);
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index 525de8fc997..de5f76d79b1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -200,6 +200,6 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
@Override
protected void doStop() {
- client.closeChannel();
+ client.closeChannel(true);
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
index ac76e6eeb8a..a4752d3acb1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClientTest.java
@@ -32,9 +32,9 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
import
org.apache.shardingsphere.database.protocol.mysql.constant.MySQLConstants;
import
org.apache.shardingsphere.database.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
@@ -63,10 +63,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -79,7 +77,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
@@ -154,7 +151,7 @@ class MySQLBinlogClientTest {
initializer.get().channelRegistered(context);
MySQLServerVersion actual = (MySQLServerVersion)
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("serverVersion"),
client);
assertThat(actual, is(expected));
- client.closeChannel();
+ client.closeChannel(true);
}
@Test
@@ -306,30 +303,46 @@ class MySQLBinlogClientTest {
assertThat(getChecksumLength(decoder), is(0));
}
- @SuppressWarnings("unchecked")
@Test
- void assertPollBranches() throws InterruptedException,
ReflectiveOperationException {
+ void assertPollOnNotRunning() throws ReflectiveOperationException {
+
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
client, channel);
setRunning(false);
+ assertThrows(RuntimeException.class, () -> client.poll());
+ }
+
+ @Test
+ void assertPollOnNotReady() throws ReflectiveOperationException {
+
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
client, channel);
+ setRunning(true);
+ setReady(false);
assertThat(client.poll(), is(Collections.emptyList()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ void assertPollBranches() throws InterruptedException,
ReflectiveOperationException {
setRunning(true);
+ setReady(false);
+ assertThat(client.poll(), is(Collections.emptyList()));
+ setReady(true);
assertThat(client.poll(), is(Collections.emptyList()));
List<MySQLBaseBinlogEvent> events = Collections.singletonList(new
PlaceholderBinlogEvent("binlog", 4L, 1L));
((ArrayBlockingQueue<List<MySQLBaseBinlogEvent>>)
Plugins.getMemberAccessor().get(MySQLBinlogClient.class.getDeclaredField("blockingEventQueue"),
client)).put(events);
assertThat(client.poll(), is(events));
- setRunning(true);
+ setReady(true);
Thread.currentThread().interrupt();
assertThat(client.poll(), is(Collections.emptyList()));
}
@Test
void assertCloseChannelWhenChannelUnavailable() {
- assertFalse(client.closeChannel().isPresent());
+ assertFalse(client.closeChannel(true).isPresent());
}
@Test
void assertCloseChannelWithoutEventLoopGroup() throws Exception {
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("channel"),
client, channel);
- Optional<ChannelFuture> actual = client.closeChannel();
+ Optional<ChannelFuture> actual = client.closeChannel(true);
assertTrue(actual.isPresent());
verify(channel).close();
}
@@ -374,59 +387,7 @@ class MySQLBinlogClientTest {
setResponseCallback(null);
handler.channelRead(mock(ChannelHandlerContext.class), new Object());
handler.exceptionCaught(mock(ChannelHandlerContext.class), new
RuntimeException("ex"));
- client.closeChannel();
- }
-
- @Test
- void assertMySQLBinlogEventHandlerBranches() throws Exception {
- client = createClientMock();
- prepareClientChannel();
- setServerVersion("5.6.0");
- doReturn(true).when(client).execute(anyString());
-
doReturn(createResultSet("CRC32")).when(client).executeQuery(anyString());
-
when(channel.writeAndFlush(any(MySQLComRegisterSlaveCommandPacket.class))).thenAnswer(invocation
-> {
- Promise<Object> callback = new
DefaultPromise<>(eventLoopGroup.next());
- callback.setSuccess(new MySQLOKPacket(0));
- setResponseCallback(callback);
- return null;
- });
- doAnswer(invocation ->
null).when(channel).writeAndFlush(any(MySQLComBinlogDumpCommandPacket.class));
- client.subscribe("binlog-000003", 12L);
- ArgumentCaptor<ChannelHandler> captor =
ArgumentCaptor.forClass(ChannelHandler.class);
- verify(pipeline, times(2)).addLast(captor.capture());
- ChannelInboundHandlerAdapter handler = captor.getAllValues().stream()
- .filter(each ->
"MySQLBinlogEventHandler".equals(each.getClass().getSimpleName()))
- .map(each -> (ChannelInboundHandlerAdapter)
each).findFirst().orElseThrow(IllegalStateException::new);
- setRunning(false);
- handler.channelRead(mock(ChannelHandlerContext.class), new Object());
- setRunning(true);
- handler.channelRead(mock(ChannelHandlerContext.class),
Collections.emptyList());
- PlaceholderBinlogEvent firstEvent = new
PlaceholderBinlogEvent("binlog-000003", 12L, 1L);
- PlaceholderBinlogEvent secondEvent = new
PlaceholderBinlogEvent("binlog-000003", 20L, 2L);
- handler.channelRead(mock(ChannelHandlerContext.class),
Arrays.asList(firstEvent, secondEvent));
- handler.channelRead(mock(ChannelHandlerContext.class), new
PlaceholderBinlogEvent("binlog-000003", 30L, 3L));
- handler.exceptionCaught(mock(ChannelHandlerContext.class), new
RuntimeException("binlog"));
- setRunning(false);
- handler.channelInactive(mock(ChannelHandlerContext.class));
- setRunning(true);
- CountDownLatch latch = new CountDownLatch(1);
- AtomicInteger connectAttempts = new AtomicInteger();
- doAnswer(invocation -> {
- if (connectAttempts.getAndIncrement() == 0) {
- throw new RuntimeException("fail");
- }
- return null;
- }).when(client).connect();
- doAnswer(invocation -> {
- latch.countDown();
- return null;
- }).when(client).subscribe(anyString(), anyLong());
- handler.channelInactive(mock(ChannelHandlerContext.class));
- handler.channelInactive(mock(ChannelHandlerContext.class));
- assertTrue(latch.await(3L, TimeUnit.SECONDS));
- AtomicBoolean reconnectRequested = (AtomicBoolean)
Plugins.getMemberAccessor().get(handler.getClass().getDeclaredField("reconnectRequested"),
handler);
- reconnectRequested.set(true);
- handler.channelInactive(mock(ChannelHandlerContext.class));
+ client.closeChannel(true);
}
private InternalResultSet createResultSet(final String checksum) {
@@ -460,6 +421,11 @@ class MySQLBinlogClientTest {
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("running"),
client, value);
}
+ @SneakyThrows(ReflectiveOperationException.class)
+ private void setReady(final boolean value) {
+
Plugins.getMemberAccessor().set(MySQLBinlogClient.class.getDeclaredField("ready"),
client, value);
+ }
+
@SneakyThrows(ReflectiveOperationException.class)
private int getChecksumLength(final MySQLBinlogEventPacketDecoder decoder)
{
MySQLBinlogContext binlogContext = (MySQLBinlogContext)
Plugins.getMemberAccessor().get(MySQLBinlogEventPacketDecoder.class.getDeclaredField("binlogContext"),
decoder);
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index 19968fe9874..249a2ad2978 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -164,7 +164,7 @@ class MySQLIncrementalDumperTest {
dumperThread.join(1000L);
verify(client).connect();
verify(client).subscribe("binlog-000001", 4L);
- verify(client, timeout(1000L)).closeChannel();
+ verify(client, timeout(1000L)).closeChannel(true);
ArgumentCaptor<List<Record>> captor =
ArgumentCaptor.forClass(List.class);
verify(channel, timeout(1000L)).push(captor.capture());
List<Record> pushed = captor.getValue();