[ https://issues.apache.org/jira/browse/HDFS-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17685631#comment-17685631 ]
ASF GitHub Bot commented on HDFS-16853: --------------------------------------- virajjasani commented on code in PR #5366: URL: https://github.com/apache/hadoop/pull/5366#discussion_r1099512290 ########## hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java: ########## @@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call) final ResponseBuffer buf = new ResponseBuffer(); header.writeDelimitedTo(buf); RpcWritable.wrap(call.rpcRequest).writeTo(buf); - rpcRequestQueue.put(Pair.of(call, buf)); + queueIfActive(call, buf); + } + + /** + * Queue an operation into the request queue, + * waiting if necessary for the queue to have a thread to process it. + * If the connection is closed, downgrades to a no-op + * @param call call to queue + * @param buf buffer for response + * @throws InterruptedException interrupted while waiting for a free thread. + */ + private void queueIfActive( + final Call call, + final ResponseBuffer buf) throws InterruptedException { + // Get the request queue. + // done in a synchronized block to avoid a race condition where + // a call is queued after the connection has been closed + final SynchronousQueue<Pair<Call, ResponseBuffer>> queue = + acquireActiveRequestQueue(); + if (queue != null) { + try { + queue.put(Pair.of(call, buf)); + } finally { + // release the reservation afterwards. + releaseQueueReservation(); + } + } else { + LOG.debug("Discarding queued call as IPC client is stopped"); + } + } + + /** + * Get the active rpc request queue. + * If the connection is closed, returns null. + * This method is synchronized, as are the operations to set + * the {@link #shouldCloseConnection} and {@link #running} + * atomic booleans, therefore this entire method will complete in the + * same block. However, the returned queue may be used outside of + * a synchronous block, where this guarantee no longer holds. + * A queue reservation counter is used to track this. + * Callers MUST invoke {@link #releaseQueueReservation()} afterwards. + * @return the queue or null. + */ + private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>> acquireActiveRequestQueue() { + if (shouldCloseConnection.get() || !running.get()) { Review Comment: This would be too much for accessing atomic boolean > The UT TestLeaseRecovery2#testHardLeaseRecoveryAfterNameNodeRestart failed > because HADOOP-18324 > ----------------------------------------------------------------------------------------------- > > Key: HDFS-16853 > URL: https://issues.apache.org/jira/browse/HDFS-16853 > Project: Hadoop HDFS > Issue Type: Bug > Affects Versions: 3.3.5 > Reporter: ZanderXu > Assignee: ZanderXu > Priority: Blocker > Labels: pull-request-available > > The UT TestLeaseRecovery2#testHardLeaseRecoveryAfterNameNodeRestart failed > with error message: Waiting for cluster to become active. And the blocking > jstack as bellows: > {code:java} > "BP-1618793397-192.168.3.4-1669198559828 heartbeating to > localhost/127.0.0.1:54673" #260 daemon prio=5 os_prio=31 tid=0x > 00007fc1108fa000 nid=0x19303 waiting on condition [0x0000700017884000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000007430a9ec0> (a > java.util.concurrent.SynchronousQueue$TransferQueue) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.SynchronousQueue$TransferQueue.awaitFulfill(SynchronousQueue.java:762) > at > java.util.concurrent.SynchronousQueue$TransferQueue.transfer(SynchronousQueue.java:695) > at > java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:877) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1186) > at org.apache.hadoop.ipc.Client.call(Client.java:1482) > at org.apache.hadoop.ipc.Client.call(Client.java:1429) > at > org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258) > at > org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139) > at com.sun.proxy.$Proxy23.sendHeartbeat(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.sendHeartbeat(DatanodeProtocolClient > SideTranslatorPB.java:168) > at > org.apache.hadoop.hdfs.server.datanode.BPServiceActor.sendHeartBeat(BPServiceActor.java:570) > at > org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:714) > at > org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:915) > at java.lang.Thread.run(Thread.java:748) {code} > After looking into the code and found that this bug is imported by > HADOOP-18324. Because RpcRequestSender exited without cleaning up the > rpcRequestQueue, then caused BPServiceActor was blocked in sending request. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org