LB-Yu opened a new issue, #1545:
URL: https://github.com/apache/fluss/issues/1545

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/alibaba/fluss/issues) 
and found nothing similar.
   
   
   ### Fluss version
   
   main (development)
   
   ### Please describe the bug 🐞
   
   When using Flink to write to Fluss, I’ve found that the sink node 
occasionally deadlocks, causing the entire job to be delayed. After analyzing 
the thread stacks, I found that the deadlock occurs in `NettyClient`. The 
relevant thread stack traces are as follows:
   ```
   "fluss-write-sender-thread-1" Id=108 RUNNABLE
        at 
java.util.concurrent.ConcurrentHashMap.replaceNode(ConcurrentHashMap.java:1172)
        at 
java.util.concurrent.ConcurrentHashMap.remove(ConcurrentHashMap.java:1546)
        at 
com.alibaba.fluss.rpc.netty.client.NettyClient.lambda$null$0(NettyClient.java:202)
        at 
com.alibaba.fluss.rpc.netty.client.NettyClient$$Lambda$1186/843977378.accept(Unknown
 Source)
        at 
com.alibaba.fluss.rpc.netty.client.ServerConnection.lambda$whenClose$1(ServerConnection.java:134)
        at 
com.alibaba.fluss.rpc.netty.client.ServerConnection$$Lambda$1192/2067581287.accept(Unknown
 Source)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:792)
        at 
java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2153)
        at 
com.alibaba.fluss.rpc.netty.client.ServerConnection.whenClose(ServerConnection.java:134)
        at 
com.alibaba.fluss.rpc.netty.client.NettyClient.lambda$getOrCreateConnection$1(NettyClient.java:202)
        at 
com.alibaba.fluss.rpc.netty.client.NettyClient$$Lambda$1178/414968141.apply(Unknown
 Source)
        at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        -  locked 
java.util.concurrent.ConcurrentHashMap$ReservationNode@53549eed
        at 
com.alibaba.fluss.utils.MapUtils$ConcurrentHashMapForJDK8.computeIfAbsent(MapUtils.java:52)
        at 
com.alibaba.fluss.rpc.netty.client.NettyClient.getOrCreateConnection(NettyClient.java:191)
        at 
com.alibaba.fluss.rpc.netty.client.NettyClient.sendRequest(NettyClient.java:167)
        at 
com.alibaba.fluss.rpc.GatewayClientProxy.invokeRpc(GatewayClientProxy.java:80)
        at 
com.alibaba.fluss.rpc.GatewayClientProxy.invoke(GatewayClientProxy.java:68)
        at com.sun.proxy.$Proxy34.produceLog(Unknown Source)
        at 
com.alibaba.fluss.client.write.Sender.sendProduceLogRequestAndHandleResponse(Sender.java:406)
        at 
com.alibaba.fluss.client.write.Sender.lambda$sendWriteRequest$5(Sender.java:389)
        at 
com.alibaba.fluss.client.write.Sender$$Lambda$1535/1808342548.accept(Unknown 
Source)
        at java.util.HashMap.forEach(HashMap.java:1290)
        at 
com.alibaba.fluss.client.write.Sender.sendWriteRequest(Sender.java:378)
        at 
com.alibaba.fluss.client.write.Sender.lambda$sendWriteRequests$2(Sender.java:346)
        at 
com.alibaba.fluss.client.write.Sender$$Lambda$1531/508256363.accept(Unknown 
Source)
        at java.util.HashMap.forEach(HashMap.java:1290)
        at 
com.alibaba.fluss.client.write.Sender.sendWriteRequests(Sender.java:346)
        at com.alibaba.fluss.client.write.Sender.sendWriteData(Sender.java:250)
        at com.alibaba.fluss.client.write.Sender.runOnce(Sender.java:194)
        at com.alibaba.fluss.client.write.Sender.run(Sender.java:150)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        ...
   
        Number of locked synchronizers = 1
        - java.util.concurrent.ThreadPoolExecutor$Worker@67dcdeef
   ```
   
   This appears to be a known 
[bug](https://bugs.openjdk.org/browse/JDK-8062841) in JDK 8’s 
`ConcurrentHashMap.computeIfAbsent()`; we should avoid modifying the map again 
from within `computeIfAbsent`. But in `NettyClient` we have code like:
   ```java
       private ServerConnection getOrCreateConnection(ServerNode node) {
           String serverId = node.uid();
           return connections.computeIfAbsent(
                   serverId,
                   ignored -> {
                       LOG.debug("Creating connection to server {}.", node);
                       ServerConnection connection =
                               new ServerConnection(
                                       bootstrap,
                                       node,
                                       clientMetricGroup,
                                       authenticatorSupplier.get(),
                                       isInnerClient);
                       connection.whenClose(ignore -> 
connections.remove(serverId, connection));
                       return connection;
                   });
       }
   ```
   
   The above code works fine in most cases. However, if the TabletServer 
happens to crash while the `ServerConnection` is being created, 
`connections.remove` will be invoked immediately, which triggers the 
`ConcurrentHashMap` bug. Although this bug was fixed in versions after JDK 8, 
`NettyClient` may be used in the client SDK, and many clients may still be 
running on JDK 8. Therefore, we should address this issue.
   
   ### Solution
   
   To avoid this problem, we need to avoid invoking `remove` inside 
computeIfAbsent. To that end, we can introduce a new Future map.
   ```java
       private ServerConnection getOrCreateConnection(ServerNode node) {
           String serverId = node.uid();
   
           ServerConnection existing = connections.get(serverId);
           if (existing != null) {
               return existing;
           }
   
           CompletableFuture<ServerConnection> newFuture = new 
CompletableFuture<>();
           CompletableFuture<ServerConnection> f = 
connectionFutures.putIfAbsent(serverId, newFuture);
           if (f == null) {
               f = newFuture;
               try {
                   LOG.debug("Creating connection to server {}.", node);
                   ServerConnection conn =
                           new ServerConnection(
                                   bootstrap,
                                   node,
                                   clientMetricGroup,
                                   authenticatorSupplier.get(),
                                   isInnerClient);
                   conn.whenClose(ignored -> connections.remove(serverId, 
conn));
                   connections.put(serverId, conn);
   
                   newFuture.complete(conn);
               } finally {
                   connectionFutures.remove(serverId, newFuture);
               }
           }
   
           try {
               return f.get();
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               throw new RuntimeException(e);
           } catch (ExecutionException e) {
               Throwable cause = e.getCause();
               if (cause instanceof RuntimeException) {
                   throw (RuntimeException) cause;
               }
               throw new RuntimeException(cause);
           }
       }
   ```
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to