This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 885d341dc46 HDDS-14571. Remove synchronized methods from 
XceiverClientGrpc (#9718)
885d341dc46 is described below

commit 885d341dc463cfd377f766df2be8ec9b28a153b3
Author: Rishabh Patel <[email protected]>
AuthorDate: Mon Feb 23 22:56:54 2026 -0800

    HDDS-14571. Remove synchronized methods from XceiverClientGrpc (#9718)
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  | 109 ++++++++++++++-------
 1 file changed, 76 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 6cd258c5ba3..7c639e766eb 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -34,6 +34,8 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -84,8 +86,9 @@
  * how it works, and how it is integrated with the Ozone client.
  */
 public class XceiverClientGrpc extends XceiverClientSpi {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(XceiverClientGrpc.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(XceiverClientGrpc.class);
+  private static final int SHUTDOWN_WAIT_INTERVAL_MILLIS = 100;
+  private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5;
   private final Pipeline pipeline;
   private final ConfigurationSource config;
   private final Map<DatanodeID, XceiverClientProtocolServiceStub> asyncStubs;
@@ -100,7 +103,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   // command can be sent to the same DN.
   private final Map<DatanodeBlockID, DatanodeDetails> getBlockDNcache;
 
-  private boolean closed = false;
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
   /**
    * Constructs a client that can communicate with the Container framework on
@@ -115,17 +118,16 @@ public XceiverClientGrpc(Pipeline pipeline, 
ConfigurationSource config,
     super();
     Objects.requireNonNull(pipeline, "pipeline == null");
     Objects.requireNonNull(config, "config == null");
-    setTimeout(config.getTimeDuration(OzoneConfigKeys.
-        OZONE_CLIENT_READ_TIMEOUT, OzoneConfigKeys
-        .OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS));
+    
setTimeout(config.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT,
+        OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT_DEFAULT, TimeUnit.SECONDS));
     this.pipeline = pipeline;
     this.config = config;
     this.secConfig = new SecurityConfig(config);
     this.semaphore =
         new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
     this.metrics = XceiverClientManager.getXceiverClientMetrics();
-    this.channels = new HashMap<>();
-    this.asyncStubs = new HashMap<>();
+    this.channels = new ConcurrentHashMap<>();
+    this.asyncStubs = new ConcurrentHashMap<>();
     this.topologyAwareRead = config.getBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
@@ -152,34 +154,56 @@ public XceiverClientGrpc(Pipeline pipeline, 
ConfigurationSource config) {
   public void connect() throws Exception {
     // connect to the closest node, if closest node doesn't exist, delegate to
     // first node, which is usually the leader in the pipeline.
-    DatanodeDetails dn = topologyAwareRead ? this.pipeline.getClosestNode() :
-        this.pipeline.getFirstNode();
+    DatanodeDetails dn = topologyAwareRead
+        ? this.pipeline.getClosestNode()
+        : this.pipeline.getFirstNode();
     // just make a connection to the picked datanode at the beginning
     connectToDatanode(dn);
   }
 
-  private synchronized void connectToDatanode(DatanodeDetails dn)
+  private void connectToDatanode(DatanodeDetails dn)
       throws IOException {
+    if (isClosed.get()) {
+      throw new IOException("Client is closed.");
+    }
+
     if (isConnected(dn)) {
       return;
     }
-    // read port from the data node, on failure use default configured
-    // port.
+    // read port from the data node, on failure use default configured port
     int port = dn.getStandalonePort().getValue();
     if (port == 0) {
       port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
           OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
     }
+    final int finalPort = port;
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ",
-          dn, pipeline.getNodes());
+    LOG.debug("Connecting to server : {}; nodes in pipeline : {}, ", dn, 
pipeline.getNodes());
+
+    channels.computeIfPresent(dn.getID(), (dnId, channel) -> {
+      if (channel.isTerminated() || channel.isShutdown()) {
+        asyncStubs.remove(dnId);
+        return null; // removes from channels map
+      }
+
+      return channel;
+    });
+
+    ManagedChannel channel;
+    try {
+      channel = channels.computeIfAbsent(dn.getID(), dnId -> {
+        try {
+          return createChannel(dn, finalPort).build();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      });
+    } catch (RuntimeException e) {
+      LOG.error("Failed to create channel to datanode {}", dn, e);
+      throw new IOException(e.getCause());
     }
-    ManagedChannel channel = createChannel(dn, port).build();
-    XceiverClientProtocolServiceStub asyncStub =
-        XceiverClientProtocolServiceGrpc.newStub(channel);
-    asyncStubs.put(dn.getID(), asyncStub);
-    channels.put(dn.getID(), channel);
+
+    asyncStubs.computeIfAbsent(dn.getID(), dnId -> 
XceiverClientProtocolServiceGrpc.newStub(channel));
   }
 
   protected NettyChannelBuilder createChannel(DatanodeDetails dn, int port)
@@ -235,24 +259,43 @@ private boolean isConnected(ManagedChannel channel) {
    * Closes all the communication channels of the client one-by-one.
    * When a channel is closed, no further requests can be sent via the channel,
    * and the method waits to finish all ongoing communication.
-   *
-   * Note: the method wait 1 hour per channel tops and if that is not enough
-   * to finish ongoing communication, then interrupts the connection anyway.
    */
   @Override
-  public synchronized void close() {
-    closed = true;
+  public void close() {
+    if (!isClosed.compareAndSet(false, true)) {
+      // we should allow only one thread to perform the close operation to 
make it idempotent
+      return;
+    }
+
     for (ManagedChannel channel : channels.values()) {
-      channel.shutdownNow();
+      channel.shutdown();
+    }
+
+    final long maxWaitNanos = 
TimeUnit.SECONDS.toNanos(SHUTDOWN_WAIT_MAX_SECONDS);
+    long deadline = System.nanoTime() + maxWaitNanos;
+    List<ManagedChannel> nonTerminatedChannels = new 
ArrayList<>(channels.values());
+
+    while (!nonTerminatedChannels.isEmpty() && System.nanoTime() < deadline) {
+      nonTerminatedChannels.removeIf(ManagedChannel::isTerminated);
       try {
-        channel.awaitTermination(60, TimeUnit.MINUTES);
+        Thread.sleep(SHUTDOWN_WAIT_INTERVAL_MILLIS);
       } catch (InterruptedException e) {
-        LOG.error("InterruptedException while waiting for channel termination",
-            e);
-        // Re-interrupt the thread while catching InterruptedException
+        LOG.error("Interrupted while waiting for channels to terminate", e);
         Thread.currentThread().interrupt();
+        break;
       }
     }
+
+    List<DatanodeID> failedChannels = channels.entrySet().stream()
+        .filter(e -> !e.getValue().isTerminated())
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+    if (!failedChannels.isEmpty()) {
+      LOG.warn("Channels {} did not terminate within timeout.", 
failedChannels);
+    }
+
+    channels.clear();
+    asyncStubs.clear();
   }
 
   @Override
@@ -694,9 +737,9 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
     return new XceiverClientReply(replyFuture);
   }
 
-  private synchronized void checkOpen(DatanodeDetails dn)
+  private void checkOpen(DatanodeDetails dn)
       throws IOException {
-    if (closed) {
+    if (isClosed.get()) {
       throw new IOException("This channel is not connected.");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to