This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new ecede7ab8ed [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel ecede7ab8ed is described below commit ecede7ab8ed5fa5be667a9801a8a2ee5ead4a043 Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Sun Nov 20 15:57:58 2022 +0800 [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel --- .../CreditBasedPartitionRequestClientHandler.java | 22 +++---- .../netty/PartitionRequestClientFactoryTest.java | 76 ++++++++++++++++++++++ 2 files changed, 85 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 00d8e6e03f8..fe008c1afed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -119,19 +119,15 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // Unexpected close. In normal operation, the client closes the connection after all input - // channels have been removed. This indicates a problem with the remote task manager. - if (!inputChannels.isEmpty()) { - final SocketAddress remoteAddr = ctx.channel().remoteAddress(); - - notifyAllChannelsOfErrorAndClose( - new RemoteTransportException( - "Connection unexpectedly closed by remote task manager '" - + remoteAddr - + "'. " - + "This might indicate that the remote task manager was lost.", - remoteAddr)); - } + final SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + notifyAllChannelsOfErrorAndClose( + new RemoteTransportException( + "Connection unexpectedly closed by remote task manager '" + + remoteAddr + + "'. " + + "This might indicate that the remote task manager was lost.", + remoteAddr)); super.channelInactive(ctx); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index d2b57c5409d..968b824d541 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -23,9 +23,12 @@ import org.apache.flink.runtime.io.network.NetworkClientHandler; import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException; import org.apache.flink.util.TestLogger; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,6 +46,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -158,6 +164,56 @@ public class PartitionRequestClientFactoryTest extends TestLogger { assertTrue(set.size() <= maxNumberOfConnections); } + /** + * Verify that the netty client reuse when the netty server closes the channel and there is no + * input channel. + */ + @Test + public void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception { + CompletableFuture<Void> inactiveFuture = new CompletableFuture<>(); + CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>(); + NettyProtocol protocol = + new NettyProtocol(null, null) { + @Override + public ChannelHandler[] getServerChannelHandlers() { + return new ChannelHandler[] { + // Close on read + new ChannelInboundHandlerAdapter() { + @Override + public void channelRegistered(ChannelHandlerContext ctx) + throws Exception { + super.channelRegistered(ctx); + serverChannelFuture.complete(ctx.channel()); + } + } + }; + } + + @Override + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[] { + new ChannelInactiveFutureHandler(inactiveFuture) + }; + } + }; + NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol); + + PartitionRequestClientFactory factory = + new PartitionRequestClientFactory( + serverAndClient.client(), 2, 1, connectionReuseEnabled); + + ConnectionID connectionID = serverAndClient.getConnectionID(0); + NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID); + + // close server channel + Channel channel = serverChannelFuture.get(); + channel.close(); + inactiveFuture.get(); + NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID); + assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient); + shutdown(serverAndClient); + } + @Test public void testNettyClientConnectRetry() throws Exception { NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient(); @@ -329,4 +385,24 @@ public class PartitionRequestClientFactoryTest extends TestLogger { } } } + + private static class ChannelInactiveFutureHandler + extends CreditBasedPartitionRequestClientHandler { + + private final CompletableFuture<Void> inactiveFuture; + + private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) { + this.inactiveFuture = inactiveFuture; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + inactiveFuture.complete(null); + } + + public CompletableFuture<Void> getInactiveFuture() { + return inactiveFuture; + } + } }