This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 67eefc4ba [common] Fix serverConnection that has already been set to
disconnected is still processing requests error (#1722)
67eefc4ba is described below
commit 67eefc4bae5f91fa105aeca7a8f3174772d5ed75
Author: yunhong <[email protected]>
AuthorDate: Fri Sep 19 22:28:01 2025 +0800
[common] Fix serverConnection that has already been set to disconnected is
still processing requests error (#1722)
---
.../fluss/rpc/netty/client/ServerConnection.java | 52 +++-----
.../fluss/rpc/netty/client/NettyClientTest.java | 7 +-
.../rpc/netty/client/ServerConnectionTest.java | 141 +++++++++++++++++++++
3 files changed, 164 insertions(+), 36 deletions(-)
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index f07afdf9e..26ceb6bc0 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -17,6 +17,7 @@
package org.apache.fluss.rpc.netty.client;
+import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.exception.DisconnectException;
import org.apache.fluss.exception.FlussRuntimeException;
@@ -170,39 +171,18 @@ final class ServerConnection {
}
if (channel != null) {
- channel.close()
- .addListener(
- (ChannelFutureListener)
- future -> {
-
- // when finishing, if netty
successfully closes the
- // channel, then the provided
exception is used as
- // the reason for the closing. If
there was something
- // wrong at the netty side, then
that exception is
- // prioritized over the provided
one.
- if (future.isSuccess()) {
- if (cause instanceof
ClosedChannelException) {
- // the
ClosedChannelException is expected
- closeFuture.complete(null);
- } else {
-
closeFuture.completeExceptionally(cause);
- }
- } else {
- LOG.warn(
- "Something went wrong
when trying to close connection due to : ",
- cause);
-
closeFuture.completeExceptionally(future.cause());
- }
- });
+ // Close the channel directly, without waiting for the channel
to close properly.
+ channel.close();
+ }
+
+ // TODO all return completeExceptionally will let some test cases
blocked, so we
+ // need to find why the test cases are blocked and remove the if
statement.
+ if (cause instanceof ClosedChannelException
+ || cause.getCause() instanceof ConnectException) {
+ // the ClosedChannelException and ConnectException is expected.
+ closeFuture.complete(null);
} else {
- // TODO all return completeExceptionally will let some test
cases blocked, so we
- // need to find why the test cases are blocked and remove the
if statement.
- if (cause.getCause() instanceof ConnectException) {
- // the ConnectException is expected
- closeFuture.complete(null);
- } else {
- closeFuture.completeExceptionally(cause);
- }
+ closeFuture.completeExceptionally(cause);
}
connectionMetricGroup.close();
@@ -491,7 +471,8 @@ final class ServerConnection {
* <li>READY: connection is ready to send requests.
* <li>DISCONNECTED: connection is failed to establish.
*/
- private enum ConnectionState {
+ @VisibleForTesting
+ enum ConnectionState {
CONNECTING,
CHECKING_API_VERSIONS,
AUTHENTICATING,
@@ -565,4 +546,9 @@ final class ServerConnection {
return ((InetSocketAddress)
channel.remoteAddress()).getAddress().getHostAddress();
}
}
+
+ @VisibleForTesting
+ ConnectionState getConnectionState() {
+ return state;
+ }
}
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
index 72699f2d3..e7f7584f6 100644
---
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -156,9 +157,9 @@ final class NettyClientTest {
nettyClient
.sendRequest(serverNode,
ApiKeys.API_VERSIONS, request)
.get())
- .isInstanceOf(ExecutionException.class)
- .hasMessageContaining("Disconnected from node")
- .hasRootCauseMessage("finishConnect(..) failed: Connection
refused");
+ .rootCause()
+ .isInstanceOf(ConnectException.class)
+ .hasMessageContaining("Connection refused");
assertThat(nettyClient.connections().size()).isEqualTo(0);
// restart the netty server.
diff --git
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
new file mode 100644
index 000000000..86328f6da
--- /dev/null
+++
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.netty.client;
+
+import org.apache.fluss.cluster.Endpoint;
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.cluster.ServerType;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.DisconnectException;
+import org.apache.fluss.metrics.groups.MetricGroup;
+import org.apache.fluss.metrics.util.NOPMetricsGroup;
+import org.apache.fluss.rpc.TestingGatewayService;
+import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
+import org.apache.fluss.rpc.netty.client.ServerConnection.ConnectionState;
+import org.apache.fluss.rpc.netty.server.NettyServer;
+import org.apache.fluss.rpc.netty.server.RequestsMetrics;
+import org.apache.fluss.rpc.protocol.ApiKeys;
+import org.apache.fluss.security.auth.AuthenticationFactory;
+import org.apache.fluss.security.auth.ClientAuthenticator;
+import org.apache.fluss.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.fluss.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.fluss.utils.NetUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.fluss.rpc.netty.NettyUtils.getClientSocketChannelClass;
+import static org.apache.fluss.rpc.netty.NettyUtils.newEventLoopGroup;
+import static org.apache.fluss.utils.NetUtils.getAvailablePort;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ServerConnection}. */
+public class ServerConnectionTest {
+
+ private EventLoopGroup eventLoopGroup;
+ private Bootstrap bootstrap;
+ private ClientAuthenticator clientAuthenticator;
+ private Configuration conf;
+ private NettyServer nettyServer;
+ private ServerNode serverNode;
+ private TestingGatewayService service;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ conf = new Configuration();
+ buildNettyServer(0);
+
+ eventLoopGroup = newEventLoopGroup(1, "fluss-netty-client-test");
+ bootstrap =
+ new Bootstrap()
+ .group(eventLoopGroup)
+ .channel(getClientSocketChannelClass(eventLoopGroup))
+ .handler(new ClientChannelInitializer(5000));
+ clientAuthenticator =
+ AuthenticationFactory.loadClientAuthenticatorSupplier(new
Configuration()).get();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (nettyServer != null) {
+ nettyServer.close();
+ }
+
+ if (eventLoopGroup != null) {
+ eventLoopGroup.shutdownGracefully();
+ }
+ }
+
+ @Test
+ void testConnectionClose() {
+ ServerConnection connection =
+ new ServerConnection(
+ bootstrap,
+ serverNode,
+ TestingClientMetricGroup.newInstance(),
+ clientAuthenticator,
+ false);
+ ConnectionState connectionState = connection.getConnectionState();
+ assertThat(connectionState).isEqualTo(ConnectionState.CONNECTING);
+
+ GetTableSchemaRequest request =
+ new GetTableSchemaRequest()
+ .setTablePath(
+ new
PbTablePath().setDatabaseName("test").setTableName("test"))
+ .setSchemaId(0);
+ connection.send(ApiKeys.GET_TABLE_SCHEMA, request);
+
+ CompletableFuture<Void> future = connection.close();
+ connectionState = connection.getConnectionState();
+ assertThat(connectionState).isEqualTo(ConnectionState.DISCONNECTED);
+ assertThat(future.isDone()).isTrue();
+
+ assertThatThrownBy(() -> connection.send(ApiKeys.GET_TABLE_SCHEMA,
request).get())
+ .rootCause()
+ .isInstanceOf(DisconnectException.class)
+ .hasMessageContaining("Cannot send request to server");
+ future = connection.close();
+ assertThat(future.isDone()).isTrue();
+ }
+
+ private void buildNettyServer(int serverId) throws Exception {
+ try (NetUtils.Port availablePort = getAvailablePort()) {
+ serverNode =
+ new ServerNode(
+ serverId, "localhost", availablePort.getPort(),
ServerType.COORDINATOR);
+ service = new TestingGatewayService();
+ MetricGroup metricGroup = NOPMetricsGroup.newInstance();
+ nettyServer =
+ new NettyServer(
+ conf,
+ Collections.singleton(
+ new Endpoint(serverNode.host(),
serverNode.port(), "INTERNAL")),
+ service,
+ metricGroup,
+
RequestsMetrics.createCoordinatorServerRequestMetrics(metricGroup));
+ nettyServer.start();
+ }
+ }
+}