[ 
https://issues.apache.org/jira/browse/HDFS-16853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17685631#comment-17685631
 ] 

ASF GitHub Bot commented on HDFS-16853:
---------------------------------------

virajjasani commented on code in PR #5366:
URL: https://github.com/apache/hadoop/pull/5366#discussion_r1099512290


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java:
##########
@@ -1181,9 +1181,69 @@ public void sendRpcRequest(final Call call)
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
       RpcWritable.wrap(call.rpcRequest).writeTo(buf);
-      rpcRequestQueue.put(Pair.of(call, buf));
+      queueIfActive(call, buf);
+    }
+
+    /**
+     * Queue an operation into the request queue,
+     * waiting if necessary for the queue to have a thread to process it.
+     * If the connection is closed, downgrades to a no-op
+     * @param call call to queue
+     * @param buf buffer for response
+     * @throws InterruptedException interrupted while waiting for a free 
thread.
+     */
+    private void queueIfActive(
+        final Call call,
+        final ResponseBuffer buf) throws InterruptedException {
+      // Get the request queue.
+      // done in a synchronized block to avoid a race condition where
+      // a call is queued after the connection has been closed
+      final SynchronousQueue<Pair<Call, ResponseBuffer>> queue =
+          acquireActiveRequestQueue();
+      if (queue != null) {
+        try {
+          queue.put(Pair.of(call, buf));
+        } finally {
+          // release the reservation afterwards.
+          releaseQueueReservation();
+        }
+      } else {
+        LOG.debug("Discarding queued call as IPC client is stopped");
+      }
+    }
+
+    /**
+     * Get the active rpc request queue.
+     * If the connection is closed, returns null.
+     * This method is synchronized, as are the operations to set
+     * the {@link #shouldCloseConnection} and {@link #running}
+     * atomic booleans, therefore this entire method will complete in the
+     * same block. However, the returned queue may be used outside of
+     * a synchronous block, where this guarantee no longer holds.
+     * A queue reservation counter is used to track this.
+     * Callers MUST invoke {@link #releaseQueueReservation()} afterwards.
+     * @return the queue or null.
+     */
+    private synchronized SynchronousQueue<Pair<Call, ResponseBuffer>> 
acquireActiveRequestQueue() {
+      if (shouldCloseConnection.get() || !running.get()) {

Review Comment:
   This would be too much for accessing atomic boolean





> The UT TestLeaseRecovery2#testHardLeaseRecoveryAfterNameNodeRestart failed 
> because HADOOP-18324
> -----------------------------------------------------------------------------------------------
>
>                 Key: HDFS-16853
>                 URL: https://issues.apache.org/jira/browse/HDFS-16853
>             Project: Hadoop HDFS
>          Issue Type: Bug
>    Affects Versions: 3.3.5
>            Reporter: ZanderXu
>            Assignee: ZanderXu
>            Priority: Blocker
>              Labels: pull-request-available
>
> The UT TestLeaseRecovery2#testHardLeaseRecoveryAfterNameNodeRestart failed 
> with error message: Waiting for cluster to become active. And the blocking 
> jstack as bellows:
> {code:java}
> "BP-1618793397-192.168.3.4-1669198559828 heartbeating to 
> localhost/127.0.0.1:54673" #260 daemon prio=5 os_prio=31 tid=0x
> 00007fc1108fa000 nid=0x19303 waiting on condition [0x0000700017884000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007430a9ec0> (a 
> java.util.concurrent.SynchronousQueue$TransferQueue)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.SynchronousQueue$TransferQueue.awaitFulfill(SynchronousQueue.java:762)
>         at 
> java.util.concurrent.SynchronousQueue$TransferQueue.transfer(SynchronousQueue.java:695)
>         at 
> java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:877)
>         at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1186)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1482)
>         at org.apache.hadoop.ipc.Client.call(Client.java:1429)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:258)
>         at 
> org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:139)
>         at com.sun.proxy.$Proxy23.sendHeartbeat(Unknown Source)
>         at 
> org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB.sendHeartbeat(DatanodeProtocolClient
> SideTranslatorPB.java:168)
>         at 
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.sendHeartBeat(BPServiceActor.java:570)
>         at 
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:714)
>         at 
> org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:915)
>         at java.lang.Thread.run(Thread.java:748)  {code}
> After looking into the code and found that this bug is imported by 
> HADOOP-18324. Because RpcRequestSender exited without cleaning up the 
> rpcRequestQueue, then caused BPServiceActor was blocked in sending request.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to