This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 997b14653 [client] Fix exception occurs when initializing server
connection. (#1740)
997b14653 is described below
commit 997b1465342683b25fe14ab2ebec0aa0daa2cb02
Author: Hongshun Wang <[email protected]>
AuthorDate: Sat Sep 27 13:47:38 2025 +0800
[client] Fix exception occurs when initializing server connection. (#1740)
---
.../apache/fluss/rpc/netty/client/NettyClient.java | 16 +++++++---------
.../fluss/rpc/netty/client/ServerConnection.java | 15 ++++++++++-----
.../fluss/rpc/netty/client/NettyClientTest.java | 19 +++++++++++++++++++
.../fluss/rpc/netty/client/ServerConnectionTest.java | 1 +
4 files changed, 37 insertions(+), 14 deletions(-)
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
index 3334e392a..f567b8584 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/NettyClient.java
@@ -192,15 +192,13 @@ public final class NettyClient implements RpcClient {
serverId,
ignored -> {
LOG.debug("Creating connection to server {}.", node);
- ServerConnection connection =
- new ServerConnection(
- bootstrap,
- node,
- clientMetricGroup,
- authenticatorSupplier.get(),
- isInnerClient);
- connection.whenClose(ignore ->
connections.remove(serverId, connection));
- return connection;
+ return new ServerConnection(
+ bootstrap,
+ node,
+ clientMetricGroup,
+ authenticatorSupplier.get(),
+ (con, ignore) -> connections.remove(serverId, con),
+ isInnerClient);
});
}
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index 26ceb6bc0..3aa85e24f 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -58,7 +58,7 @@ import java.util.ArrayDeque;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import static org.apache.fluss.utils.IOUtils.closeQuietly;
@@ -104,15 +104,20 @@ final class ServerConnection {
ServerNode node,
ClientMetricGroup clientMetricGroup,
ClientAuthenticator authenticator,
+ BiConsumer<ServerConnection, Throwable> closeCallback,
boolean isInnerClient) {
this.node = node;
this.state = ConnectionState.CONNECTING;
this.connectionMetricGroup =
clientMetricGroup.createConnectionMetricGroup(node.uid());
+ this.authenticator = authenticator;
+ this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
+ whenClose(closeCallback);
+
+ // connect and handle should be last in case of other variables are
nullable and close
+ // callback is not registered when connection established.
bootstrap
.connect(node.host(), node.port())
.addListener(future -> establishConnection((ChannelFuture)
future, isInnerClient));
- this.authenticator = authenticator;
- this.backoff = new ExponentialBackoff(100L, 2, 5000L, 0.2);
}
public ServerNode getServerNode() {
@@ -131,8 +136,8 @@ final class ServerConnection {
}
/** Register a callback to be called when the connection is closed. */
- public void whenClose(Consumer<Throwable> closeCallback) {
- closeFuture.whenComplete((v, throwable) ->
closeCallback.accept(throwable));
+ private void whenClose(BiConsumer<ServerConnection, Throwable>
closeCallback) {
+ closeFuture.whenComplete((v, throwable) -> closeCallback.accept(this,
throwable));
}
/** Close the connection. */
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
index e7f7584f6..94b79b556 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
@@ -234,6 +234,25 @@ final class NettyClientTest {
}
}
+ @Test
+ void testExceptionWhenInitializeServerConnection() throws Exception {
+ ApiVersionsRequest request =
+ new ApiVersionsRequest()
+ .setClientSoftwareName("testing_client_100")
+ .setClientSoftwareVersion("1.0");
+ // close the netty server.
+ nettyServer.close();
+
+ // send request and create server connection.
+ assertThatThrownBy(
+ () ->
+ nettyClient
+ .sendRequest(serverNode,
ApiKeys.API_VERSIONS, request)
+ .get())
+ .hasMessageContaining("Disconnected from node");
+ assertThat(nettyClient.connections()).isEmpty();
+ }
+
private void buildNettyServer(int serverId) throws Exception {
try (NetUtils.Port availablePort = getAvailablePort()) {
serverNode =
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
index 86328f6da..6b2363c88 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -96,6 +96,7 @@ public class ServerConnectionTest {
serverNode,
TestingClientMetricGroup.newInstance(),
clientAuthenticator,
+ (con, ignore) -> {},
false);
ConnectionState connectionState = connection.getConnectionState();
assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);