LB-Yu opened a new issue, #1545: URL: https://github.com/apache/fluss/issues/1545
### Search before asking - [x] I searched in the [issues](https://github.com/alibaba/fluss/issues) and found nothing similar. ### Fluss version main (development) ### Please describe the bug 🐞 When using Flink to write to Fluss, I’ve found that the sink node occasionally deadlocks, causing the entire job to be delayed. After analyzing the thread stacks, I found that the deadlock occurs in `NettyClient`. The relevant thread stack traces are as follows: ``` "fluss-write-sender-thread-1" Id=108 RUNNABLE at java.util.concurrent.ConcurrentHashMap.replaceNode(ConcurrentHashMap.java:1172) at java.util.concurrent.ConcurrentHashMap.remove(ConcurrentHashMap.java:1546) at com.alibaba.fluss.rpc.netty.client.NettyClient.lambda$null$0(NettyClient.java:202) at com.alibaba.fluss.rpc.netty.client.NettyClient$$Lambda$1186/843977378.accept(Unknown Source) at com.alibaba.fluss.rpc.netty.client.ServerConnection.lambda$whenClose$1(ServerConnection.java:134) at com.alibaba.fluss.rpc.netty.client.ServerConnection$$Lambda$1192/2067581287.accept(Unknown Source) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153) at com.alibaba.fluss.rpc.netty.client.ServerConnection.whenClose(ServerConnection.java:134) at com.alibaba.fluss.rpc.netty.client.NettyClient.lambda$getOrCreateConnection$1(NettyClient.java:202) at com.alibaba.fluss.rpc.netty.client.NettyClient$$Lambda$1178/414968141.apply(Unknown Source) at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) - locked java.util.concurrent.ConcurrentHashMap$ReservationNode@53549eed at com.alibaba.fluss.utils.MapUtils$ConcurrentHashMapForJDK8.computeIfAbsent(MapUtils.java:52) at com.alibaba.fluss.rpc.netty.client.NettyClient.getOrCreateConnection(NettyClient.java:191) at com.alibaba.fluss.rpc.netty.client.NettyClient.sendRequest(NettyClient.java:167) at com.alibaba.fluss.rpc.GatewayClientProxy.invokeRpc(GatewayClientProxy.java:80) at com.alibaba.fluss.rpc.GatewayClientProxy.invoke(GatewayClientProxy.java:68) at com.sun.proxy.$Proxy34.produceLog(Unknown Source) at com.alibaba.fluss.client.write.Sender.sendProduceLogRequestAndHandleResponse(Sender.java:406) at com.alibaba.fluss.client.write.Sender.lambda$sendWriteRequest$5(Sender.java:389) at com.alibaba.fluss.client.write.Sender$$Lambda$1535/1808342548.accept(Unknown Source) at java.util.HashMap.forEach(HashMap.java:1290) at com.alibaba.fluss.client.write.Sender.sendWriteRequest(Sender.java:378) at com.alibaba.fluss.client.write.Sender.lambda$sendWriteRequests$2(Sender.java:346) at com.alibaba.fluss.client.write.Sender$$Lambda$1531/508256363.accept(Unknown Source) at java.util.HashMap.forEach(HashMap.java:1290) at com.alibaba.fluss.client.write.Sender.sendWriteRequests(Sender.java:346) at com.alibaba.fluss.client.write.Sender.sendWriteData(Sender.java:250) at com.alibaba.fluss.client.write.Sender.runOnce(Sender.java:194) at com.alibaba.fluss.client.write.Sender.run(Sender.java:150) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... Number of locked synchronizers = 1 - java.util.concurrent.ThreadPoolExecutor$Worker@67dcdeef ``` This appears to be a known [bug](https://bugs.openjdk.org/browse/JDK-8062841) in JDK 8’s `ConcurrentHashMap.computeIfAbsent()`; we should avoid modifying the map again from within `computeIfAbsent`. But in `NettyClient` we have code like: ```java private ServerConnection getOrCreateConnection(ServerNode node) { String serverId = node.uid(); return connections.computeIfAbsent( serverId, ignored -> { LOG.debug("Creating connection to server {}.", node); ServerConnection connection = new ServerConnection( bootstrap, node, clientMetricGroup, authenticatorSupplier.get(), isInnerClient); connection.whenClose(ignore -> connections.remove(serverId, connection)); return connection; }); } ``` The above code works fine in most cases. However, if the TabletServer happens to crash while the `ServerConnection` is being created, `connections.remove` will be invoked immediately, which triggers the `ConcurrentHashMap` bug. Although this bug was fixed in versions after JDK 8, `NettyClient` may be used in the client SDK, and many clients may still be running on JDK 8. Therefore, we should address this issue. ### Solution To avoid this problem, we need to avoid invoking `remove` inside computeIfAbsent. To that end, we can introduce a new Future map. ```java private ServerConnection getOrCreateConnection(ServerNode node) { String serverId = node.uid(); ServerConnection existing = connections.get(serverId); if (existing != null) { return existing; } CompletableFuture<ServerConnection> newFuture = new CompletableFuture<>(); CompletableFuture<ServerConnection> f = connectionFutures.putIfAbsent(serverId, newFuture); if (f == null) { f = newFuture; try { LOG.debug("Creating connection to server {}.", node); ServerConnection conn = new ServerConnection( bootstrap, node, clientMetricGroup, authenticatorSupplier.get(), isInnerClient); conn.whenClose(ignored -> connections.remove(serverId, conn)); connections.put(serverId, conn); newFuture.complete(conn); } finally { connectionFutures.remove(serverId, newFuture); } } try { return f.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new RuntimeException(cause); } } ``` ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
