kerneltime commented on code in PR #10470:
URL: https://github.com/apache/ozone/pull/10470#discussion_r3382377516


##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMProxyInfo.java:
##########
@@ -106,10 +124,78 @@ public synchronized void 
createProxyIfNeeded(CheckedFunction<InetSocketAddress,
   }
 
   /**
-   * A {@link OMProxyInfo} map with a particular order,
+   * Re-resolve {@link #rpcAddrStr} via DNS and, if the resolved IP has
+   * changed, replace the cached {@link #rpcAddr} (and the derived
+   * delegation-token service) and discard the cached proxy so that the
+   * next {@link #createProxyIfNeeded} call dials the new IP. The stale
+   * RPC proxy is closed via {@link RPC#stopProxy} so the underlying
+   * Hadoop {@code Client} connection thread and authenticated SASL
+   * session against the gone-away peer are not leaked.
+   * <p>
+   * Returns true when a swap occurred. Off the failure path this is a
+   * no-op (returns false): unchanged IP, unresolved lookup, or
+   * malformed host string.
+   * <p>
+   * The DNS lookup and the {@code RPC.stopProxy} call are performed
+   * outside the entry monitor so that a slow / dead resolver or a
+   * blocking proxy teardown does not freeze concurrent readers of
+   * {@link #getAddress()} / {@link #getProxy()}.
+   */
+  public boolean refreshAddressIfChanged() {
+    final InetSocketAddress refreshed;
+    try {
+      refreshed = NetUtils.createSocketAddr(rpcAddrStr);
+    } catch (IllegalArgumentException ex) {
+      LOG.warn("Failed to re-resolve OM address {}: {}", rpcAddrStr,
+          ex.getMessage());
+      return false;
+    }
+    if (refreshed.isUnresolved()) {
+      LOG.warn("OM hostname {} re-resolved to an unresolved address; "
+          + "leaving cached entry in place.", rpcAddrStr);
+      return false;
+    }
+    final T staleProxy;
+    final InetSocketAddress old;
+    synchronized (this) {
+      if (refreshed.getAddress() != null
+          && refreshed.getAddress().equals(rpcAddr.getAddress())) {
+        return false;

Review Comment:
   Fixed in `46e7f544`. Replaced the equality check with a null-safe IP 
comparison: `if (cachedIp != null && refreshedIp != null && 
refreshedIp.equals(cachedIp))`. When the cached `rpcAddr` was constructed 
unresolved (the constructor accepts this with a warn at OMProxyInfo.java:81), 
`cachedIp` is null and we now ALLOW the swap — which is correct because 
resolved-now is genuinely a change, and that is the case the refresh path most 
needs to recover from. NetUtils.createSocketAddr is invoked outside the 
synchronized block before the comparison, so no I/O happens under lock either.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/EndpointStateMachine.java:
##########
@@ -79,6 +107,52 @@ public EndpointStateMachine(InetSocketAddress address,
             .build());
   }
 
+  /**
+   * The original "host:port" string used to construct {@link #getAddress()}.
+   * @return the host:port string, or null if not preserved at construction
+   */
+  public String getHostAndPort() {
+    return hostAndPort;
+  }
+
+  /**
+   * Re-resolves the configured hostname and reports whether the resolved
+   * IP differs from the cached {@link #address}. Does not mutate any
+   * state on this endpoint -- the caller is responsible for swapping
+   * this endpoint out (via {@link SCMConnectionManager#refreshSCMServer})
+   * if a refresh is desired.
+   * <p>
+   * Returns null when:
+   * <ul>
+   *   <li>{@link #hostAndPort} was not preserved at construction</li>
+   *   <li>the resolved IP is unchanged</li>
+   *   <li>the new resolution is unresolved (DNS lookup failed)</li>
+   * </ul>
+   * Returns the freshly-resolved {@link InetSocketAddress} when the IP
+   * has changed under the same hostname.
+   */
+  public InetSocketAddress resolveLatestAddress() {
+    if (hostAndPort == null) {
+      return null;
+    }
+    InetSocketAddress refreshed;
+    try {
+      refreshed = NetUtils.createSocketAddr(hostAndPort);
+    } catch (IllegalArgumentException ex) {
+      LOG.warn("Failed to re-resolve {}: {}", hostAndPort, ex.getMessage());
+      return null;
+    }
+    if (refreshed.isUnresolved()) {
+      LOG.warn("Re-resolution of {} produced an unresolved address; "
+          + "leaving cached address {} in place.", hostAndPort, address);
+      return null;
+    }
+    if (refreshed.getAddress().equals(address.getAddress())) {
+      return null;
+    }

Review Comment:
   Fixed in `46e7f544`. Same null-safe comparison applied at 
EndpointStateMachine.java:152. If the cached `address` was unresolved (a 
possibility because `addSCMServer` does not reject unresolved at construction, 
and `addReconServer` never preserves a hostname), `address.getAddress()` is 
null and a successful re-resolution is now treated as a swap rather than 
NPE-ing the heartbeat refresh path on the case it most needs to fix.



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMFailoverProxyProviderBase.java:
##########
@@ -260,6 +289,79 @@ public synchronized void close() throws IOException {
     }
   }
 
+  /**
+   * Re-resolve the configured hostname for the given SCM nodeId. If DNS
+   * now returns a different IP, swap in a fresh {@link SCMProxyInfo}
+   * (with the new resolved address) and discard any cached proxy so the
+   * next {@link #getProxy()} call dials the new IP.
+   *
+   * @return true when a swap occurred; false when the hostname was not
+   *         preserved, the IP is unchanged, the lookup failed, or the
+   *         nodeId is unknown.
+   */
+  @VisibleForTesting
+  boolean refreshProxyAddressIfChanged(String nodeId) {
+    // Read the cached info first so we can do the DNS lookup outside
+    // any monitor. A slow / dead resolver while holding the provider
+    // monitor would freeze every concurrent getProxy() / shouldRetry()
+    // caller.
+    String hostAndPort;
+    InetSocketAddress cachedAddress;
+    String serviceId;
+    synchronized (this) {
+      SCMProxyInfo cached = scmProxyInfoMap.get(nodeId);
+      if (cached == null) {
+        return false;
+      }
+      hostAndPort = cached.getHostAndPort();
+      if (hostAndPort == null) {
+        return false;
+      }
+      cachedAddress = cached.getAddress();
+      serviceId = cached.getServiceId();
+    }
+    InetSocketAddress refreshed;
+    try {
+      refreshed = NetUtils.createSocketAddr(hostAndPort);
+    } catch (IllegalArgumentException ex) {
+      getLogger().warn("Failed to re-resolve SCM address {}: {}",
+          hostAndPort, ex.getMessage());
+      return false;
+    }
+    if (refreshed.isUnresolved()) {
+      getLogger().warn("SCM hostname {} re-resolved to an unresolved "
+          + "address; leaving cached entry in place.", hostAndPort);
+      return false;
+    }
+    if (refreshed.getAddress().equals(cachedAddress.getAddress())) {
+      return false;
+    }

Review Comment:
   Fixed in `46e7f544`. Same pattern at SCMFailoverProxyProviderBase.java:338 — 
null-safe IP comparison, with an unresolved cached address treated as "swap 
allowed" rather than NPE.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -132,42 +133,153 @@ public void writeUnlock() {
    */
   public void addSCMServer(InetSocketAddress address,
       String threadNamePrefix) throws IOException {
+    addSCMServer(address, null, threadNamePrefix);
+  }
+
+  /**
+   * Adds a new SCM machine and remembers the original host:port string
+   * so the DN can re-resolve DNS on connection failure (e.g. after the
+   * SCM peer is rescheduled to a new pod IP in Kubernetes).
+   *
+   * @param address     resolved RPC address used to build the proxy
+   * @param hostAndPort original "host:port" string, or null to disable
+   *                    DNS re-resolution for this endpoint
+   * @param threadNamePrefix prefix for the endpoint's task thread
+   */
+  public void addSCMServer(InetSocketAddress address, String hostAndPort,
+      String threadNamePrefix) throws IOException {
     writeLock();
     try {
       if (scmMachines.containsKey(address)) {
         LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
             "Ignoring the request.");
         return;
       }
+      EndpointStateMachine endPoint =
+          buildScmEndpoint(address, hostAndPort, threadNamePrefix);
+      scmMachines.put(address, endPoint);
+    } finally {
+      writeUnlock();
+    }
+  }
 
-      Configuration hadoopConfig =
-          LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
-      RPC.setProtocolEngine(
-          hadoopConfig,
-          StorageContainerDatanodeProtocolPB.class,
-          ProtobufRpcEngine.class);
-      long version =
-          RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+  /**
+   * Build (but do NOT register) a fresh active-SCM endpoint bound to
+   * {@code address}. The caller is responsible for registering it in
+   * {@code scmMachines} (and tearing down any previous endpoint it
+   * replaces). Factored out of {@link #addSCMServer} so
+   * {@link #refreshSCMServer} can construct the replacement BEFORE
+   * removing the stale entry, preserving the peer in {@code scmMachines}
+   * if proxy construction throws (transient DNS failure, peer not yet
+   * accepting on the new IP, etc.).
+   */
+  @VisibleForTesting
+  EndpointStateMachine buildScmEndpoint(InetSocketAddress address,
+      String hostAndPort, String threadNamePrefix) throws IOException {
+    Configuration hadoopConfig =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+    RPC.setProtocolEngine(
+        hadoopConfig,
+        StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long version =
+        RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
 
-      RetryPolicy retryPolicy =
-          RetryPolicies.retryUpToMaximumCountWithFixedSleep(
-              getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
-              TimeUnit.MILLISECONDS);
+    RetryPolicy retryPolicy =
+        RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+            getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+            TimeUnit.MILLISECONDS);
 
-      StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
-          StorageContainerDatanodeProtocolPB.class, version,
-          address, UserGroupInformation.getCurrentUser(), hadoopConfig,
-          NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
-          retryPolicy).getProxy();
+    StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+        StorageContainerDatanodeProtocolPB.class, version,
+        address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+        NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
+        retryPolicy).getProxy();
 
-      StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
-          new StorageContainerDatanodeProtocolClientSideTranslatorPB(
-              rpcProxy);
+    StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+        new StorageContainerDatanodeProtocolClientSideTranslatorPB(
+            rpcProxy);
 
-      EndpointStateMachine endPoint = new EndpointStateMachine(address,
-          rpcClient, this.conf, threadNamePrefix);
-      endPoint.setPassive(false);
-      scmMachines.put(address, endPoint);
+    EndpointStateMachine endPoint = new EndpointStateMachine(address,
+        hostAndPort, rpcClient, this.conf, threadNamePrefix);
+    endPoint.setPassive(false);
+    return endPoint;
+  }
+
+  /**
+   * Re-resolve the SCM hostname for the endpoint at {@code oldAddress} and,
+   * if the resolved IP has changed, atomically replace the endpoint with a
+   * fresh one bound to the new address.
+   * <p>
+   * Returns the new {@link InetSocketAddress} on a successful swap, or null
+   * if no swap occurred (endpoint not found, hostname not preserved at
+   * construction, IP unchanged, or DNS lookup failed). Callers receive
+   * enough information to update any external maps keyed by the old
+   * address.
+   * <p>
+   * The replacement is built via the existing {@link #addSCMServer} path,
+   * so the new endpoint starts in {@code GETVERSION} state and re-walks
+   * version → register → heartbeat. This is the correct behavior: a peer
+   * that has been rescheduled is effectively a fresh process.
+   */
+  public InetSocketAddress refreshSCMServer(InetSocketAddress oldAddress,
+      String threadNamePrefix) throws IOException {
+    writeLock();
+    try {
+      EndpointStateMachine existing = scmMachines.get(oldAddress);
+      if (existing == null) {
+        return null;
+      }
+      // Recon endpoints (added via addReconServer) speak a different
+      // protocol than active SCM endpoints. The current refresh path
+      // only knows how to rebuild SCM endpoints, so refusing to
+      // refresh a passive endpoint avoids silently downgrading a
+      // Recon endpoint to an SCM-protocol one. Recon's cached IP is
+      // also a much narrower problem in practice (Recon is rarely
+      // pod-rescheduled the way SCM-HA peers are).
+      if (existing.isPassive()) {
+        return null;
+      }
+      InetSocketAddress refreshed = existing.resolveLatestAddress();
+      if (refreshed == null) {
+        return null;
+      }
+      String hostAndPort = existing.getHostAndPort();
+      // Build the replacement BEFORE removing the stale entry so a
+      // failure to construct the new proxy (transient DNS, peer not
+      // yet accepting on the new IP, NetUtils refusing the address)
+      // leaves the existing endpoint registered. Otherwise the peer
+      // would disappear from scmMachines entirely and the next
+      // heartbeat cycle would have nothing to dial -- much worse than
+      // the pre-PR behaviour of dialing the stale IP.
+      EndpointStateMachine replacement;
+      try {
+        replacement = buildScmEndpoint(refreshed, hostAndPort,
+            threadNamePrefix);
+      } catch (IOException buildEx) {
+        LOG.warn("DNS re-resolution: failed to build replacement SCM "
+                + "endpoint for {} -> {} (host {}); leaving stale endpoint "
+                + "in place. Cause: {}", oldAddress, refreshed, hostAndPort,
+            buildEx.getMessage());
+        throw buildEx;
+      }
+      // Replacement is built; commit the swap atomically under the
+      // write lock we already hold, then close the old endpoint
+      // outside the critical section's correctness path (close()
+      // failures only affect cleanup, not registration).
+      scmMachines.put(refreshed, replacement);
+      if (!refreshed.equals(oldAddress)) {
+        scmMachines.remove(oldAddress);
+      }

Review Comment:
   Fixed in the prior push at `774c8eb4` (R3-C3). 
`SCMConnectionManager.refreshSCMServer` now refuses the swap when 
`!refreshed.equals(oldAddress) && scmMachines.containsKey(refreshed)`. The 
freshly-built replacement is discarded (no put, no close on the existing 
endpoint, return null). Avoids the "kube-dns transiently maps SCM-A to SCM-B's 
IP" scenario where the put would silently overwrite SCM-B's 
`EndpointStateMachine` and leak its executor.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -185,14 +186,26 @@ public StateContext(ConfigurationSource conf,
     containerReports = new AtomicReference<>();
     nodeReport = new AtomicReference<>();

Review Comment:
   Fixed in `46e7f544`. Promoted `incrementalReportsQueue` to 
`ConcurrentHashMap` (StateContext.java:189). The producer-side 
`synchronized(incrementalReportsQueue)` blocks remain because they guard 
COMPOUND operations on the inner LinkedList values (e.g. `q.add`, `q.addAll`, 
`q.removeIf`) that ConcurrentHashMap does not protect. CHM only fixes the 
map-structure race that addEndpoint/removeEndpoint/migrateEndpoint introduced. 
Metric readers iterating `entrySet()` without a monitor are now safe via CHM's 
weakly-consistent iteration.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -185,14 +186,26 @@ public StateContext(ConfigurationSource conf,
     containerReports = new AtomicReference<>();
     nodeReport = new AtomicReference<>();
     pipelineReports = new AtomicReference<>();
-    endpoints = new HashSet<>();
+    // CopyOnWriteArraySet so producers in addContainerAction /
+    // addIncrementalReport / addPipelineActionIfAbsent can iterate
+    // endpoints lock-free and a concurrent migrateEndpoint() (fired
+    // from the heartbeat thread on DNS-refresh recovery) cannot raise
+    // ConcurrentModificationException. Endpoint membership changes
+    // are infrequent (startup add, reconfig, refresh swap) so the
+    // copy-on-write cost is negligible.
+    endpoints = new CopyOnWriteArraySet<>();
     containerActions = new HashMap<>();
     pipelineActions = new ConcurrentHashMap<>();
     lock = new ReentrantLock();

Review Comment:
   Fixed in `46e7f544`. Same pattern as `incrementalReportsQueue`: promoted 
`containerActions` to `ConcurrentHashMap` (StateContext.java:202). Producer 
`synchronized(containerActions)` blocks remain — they still guard the inner 
Queue operations.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -984,6 +1169,20 @@ synchronized boolean putIfAbsent(PipelineKey key,
       return map.putIfAbsent(key, pipelineAction) == null;
     }
 
+    /**
+     * Drain entries that are absent in {@code target}'s map into it,
+     * preserving the original insertion order. Used by the
+     * DNS-refresh-on-failure endpoint migration to move queued
+     * pipeline actions from a stale endpoint key to a fresh one
+     * without losing them.
+     */
+    synchronized void drainInto(PipelineActionMap target) {
+      for (Map.Entry<PipelineKey, PipelineAction> entry : map.entrySet()) {
+        target.putIfAbsent(entry.getKey(), entry.getValue());
+      }
+      map.clear();
+    }

Review Comment:
   Fixed in `46e7f544`. `drainInto` was added in R2 when `migrateEndpoint` did 
merge-on-collision, but the post-R2 collision logic drops the old map without 
merging. Removed the dead method and its Javadoc.



##########
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestConnectionFailureUtils.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.stream.Stream;
+import org.apache.hadoop.security.AccessControlException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Verifies the connection-class exception classifier used to gate the
+ * DNS-refresh-on-failure code path on both the OM and SCM failover
+ * proxy providers and the DataNode heartbeat catch block.
+ * <p>
+ * The classifier must:
+ * <ul>
+ *   <li>Match every exception type that signals "the cached IP is no
+ *       longer reachable" -- including the AWS EC2 / EKS silent-drop
+ *       case which surfaces as {@link SocketTimeoutException}, the
+ *       case the PR motivating this helper (HDDS-15514) is sold on. </li>
+ *   <li>Reject application-level errors (NotLeader, AccessControl,
+ *       protocol mismatch) so we don't add DNS load on logical
+ *       failures where the cached IP is fine. </li>
+ *   <li>Walk wrapped cause chains so a {@code RemoteException(...)} or
+ *       {@code IOException(...)} carrying a connection-class cause is
+ *       still classified correctly. </li>
+ *   <li>Defend against pathological cycles in the cause chain. </li>
+ * </ul>
+ */
+public class TestConnectionFailureUtils {
+
+  static Stream<Arguments> connectionClassExceptions() {
+    return Stream.of(
+        Arguments.of(new ConnectException("refused"),         
"ConnectException"),
+        Arguments.of(new SocketTimeoutException("EC2 drop"),  
"SocketTimeoutException (AWS silent drop)"),
+        Arguments.of(new NoRouteToHostException("gone"),      
"NoRouteToHostException"),
+        Arguments.of(new UnknownHostException("dns failed"),  
"UnknownHostException"),
+        Arguments.of(new EOFException("LB closed"),           "EOFException"),
+        Arguments.of(new SocketException("Connection reset"), 
"SocketException")
+    );
+  }
+
+  @ParameterizedTest(name = "isConnectionFailure detects {1}")
+  @MethodSource("connectionClassExceptions")
+  public void testDetectsBareConnectionClass(Throwable t, String label) {
+    assertTrue(ConnectionFailureUtils.isConnectionFailure(t),
+        label + " must be classified as a connection failure");
+  }
+
+  @ParameterizedTest(name = "isConnectionFailure walks IOException wrap of 
{1}")
+  @MethodSource("connectionClassExceptions")
+  public void testDetectsThroughIOExceptionWrap(Throwable t, String label) {
+    IOException wrapped = new IOException("rpc failed", t);
+    assertTrue(ConnectionFailureUtils.isConnectionFailure(wrapped),
+        "IOException wrapping " + label + " must still be classified");
+  }
+
+  @Test
+  public void testDeeplyNestedChainStillClassified() {
+    // ConnectException three levels deep, the way Hadoop RPC's 
RetriableException
+    // wraps ServiceException wraps IOException wraps the real cause.
+    Throwable deep = new RuntimeException("outer",
+        new IOException("middle",
+            new IOException("inner", new ConnectException("dead"))));
+    assertTrue(ConnectionFailureUtils.isConnectionFailure(deep));
+  }
+
+  static Stream<Arguments> applicationLevelExceptions() {
+    return Stream.of(
+        Arguments.of(new AccessControlException("denied"),
+            "AccessControlException"),
+        Arguments.of(new IllegalArgumentException("bad request"),
+            "IllegalArgumentException"),
+        Arguments.of(new IOException("application error: not leader"),
+            "plain IOException without connection-class cause"),
+        Arguments.of(new RuntimeException("retry, please"),
+            "plain RuntimeException")
+    );
+  }
+
+  @ParameterizedTest(name = "isConnectionFailure rejects {1}")
+  @MethodSource("applicationLevelExceptions")
+  public void testRejectsApplicationLevel(Throwable t, String label) {
+    assertFalse(ConnectionFailureUtils.isConnectionFailure(t),
+        label + " is an application error, refresh must NOT trigger");
+  }
+
+  @Test
+  public void testNullIsNotAConnectionFailure() {
+    assertFalse(ConnectionFailureUtils.isConnectionFailure(null));
+  }
+
+  /**
+   * {@code Throwable.initCause} contractually rejects setting cause to
+   * the throwable itself, but cycles of length 2+ have appeared in
+   * practice (proxy frameworks and faulty initCause callers can
+   * construct them). The walk must terminate within the configured
+   * depth bound rather than looping forever.
+   */
+  @Test
+  public void testCycleOfLengthTwoTerminates() throws Exception {
+    Throwable a = new IOException("a");
+    Throwable b = new IOException("b");
+    // Build the cycle reflectively: a.cause -> b, b.cause -> a.
+    java.lang.reflect.Field causeField = 
Throwable.class.getDeclaredField("cause");
+    causeField.setAccessible(true);
+    causeField.set(a, b);
+    causeField.set(b, a);

Review Comment:
   Fixed in `46e7f544`. Rewrote `testCycleOfLengthTwoTerminates` to use 
`Throwable.initCause` (no reflection): the no-arg `Throwable()` ctor leaves 
cause uninitialized (cause==this sentinel), so a single `initCause` call on 
each side is permitted and lets us close the length-2 cycle. JDK 16+ portable.



##########
hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/utils/TestConnectionFailureUtils.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.stream.Stream;
+import org.apache.hadoop.security.AccessControlException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Verifies the connection-class exception classifier used to gate the
+ * DNS-refresh-on-failure code path on both the OM and SCM failover
+ * proxy providers and the DataNode heartbeat catch block.
+ * <p>
+ * The classifier must:
+ * <ul>
+ *   <li>Match every exception type that signals "the cached IP is no
+ *       longer reachable" -- including the AWS EC2 / EKS silent-drop
+ *       case which surfaces as {@link SocketTimeoutException}, the
+ *       case the PR motivating this helper (HDDS-15514) is sold on. </li>
+ *   <li>Reject application-level errors (NotLeader, AccessControl,
+ *       protocol mismatch) so we don't add DNS load on logical
+ *       failures where the cached IP is fine. </li>
+ *   <li>Walk wrapped cause chains so a {@code RemoteException(...)} or
+ *       {@code IOException(...)} carrying a connection-class cause is
+ *       still classified correctly. </li>
+ *   <li>Defend against pathological cycles in the cause chain. </li>
+ * </ul>
+ */
+public class TestConnectionFailureUtils {
+
+  static Stream<Arguments> connectionClassExceptions() {
+    return Stream.of(
+        Arguments.of(new ConnectException("refused"),         
"ConnectException"),
+        Arguments.of(new SocketTimeoutException("EC2 drop"),  
"SocketTimeoutException (AWS silent drop)"),
+        Arguments.of(new NoRouteToHostException("gone"),      
"NoRouteToHostException"),
+        Arguments.of(new UnknownHostException("dns failed"),  
"UnknownHostException"),
+        Arguments.of(new EOFException("LB closed"),           "EOFException"),
+        Arguments.of(new SocketException("Connection reset"), 
"SocketException")
+    );
+  }
+
+  @ParameterizedTest(name = "isConnectionFailure detects {1}")
+  @MethodSource("connectionClassExceptions")
+  public void testDetectsBareConnectionClass(Throwable t, String label) {
+    assertTrue(ConnectionFailureUtils.isConnectionFailure(t),
+        label + " must be classified as a connection failure");
+  }
+
+  @ParameterizedTest(name = "isConnectionFailure walks IOException wrap of 
{1}")
+  @MethodSource("connectionClassExceptions")
+  public void testDetectsThroughIOExceptionWrap(Throwable t, String label) {
+    IOException wrapped = new IOException("rpc failed", t);
+    assertTrue(ConnectionFailureUtils.isConnectionFailure(wrapped),
+        "IOException wrapping " + label + " must still be classified");
+  }
+
+  @Test
+  public void testDeeplyNestedChainStillClassified() {
+    // ConnectException three levels deep, the way Hadoop RPC's 
RetriableException
+    // wraps ServiceException wraps IOException wraps the real cause.
+    Throwable deep = new RuntimeException("outer",
+        new IOException("middle",
+            new IOException("inner", new ConnectException("dead"))));
+    assertTrue(ConnectionFailureUtils.isConnectionFailure(deep));
+  }
+
+  static Stream<Arguments> applicationLevelExceptions() {
+    return Stream.of(
+        Arguments.of(new AccessControlException("denied"),
+            "AccessControlException"),
+        Arguments.of(new IllegalArgumentException("bad request"),
+            "IllegalArgumentException"),
+        Arguments.of(new IOException("application error: not leader"),
+            "plain IOException without connection-class cause"),
+        Arguments.of(new RuntimeException("retry, please"),
+            "plain RuntimeException")
+    );
+  }
+
+  @ParameterizedTest(name = "isConnectionFailure rejects {1}")
+  @MethodSource("applicationLevelExceptions")
+  public void testRejectsApplicationLevel(Throwable t, String label) {
+    assertFalse(ConnectionFailureUtils.isConnectionFailure(t),
+        label + " is an application error, refresh must NOT trigger");
+  }
+
+  @Test
+  public void testNullIsNotAConnectionFailure() {
+    assertFalse(ConnectionFailureUtils.isConnectionFailure(null));
+  }
+
+  /**
+   * {@code Throwable.initCause} contractually rejects setting cause to
+   * the throwable itself, but cycles of length 2+ have appeared in
+   * practice (proxy frameworks and faulty initCause callers can
+   * construct them). The walk must terminate within the configured
+   * depth bound rather than looping forever.
+   */
+  @Test
+  public void testCycleOfLengthTwoTerminates() throws Exception {
+    Throwable a = new IOException("a");
+    Throwable b = new IOException("b");
+    // Build the cycle reflectively: a.cause -> b, b.cause -> a.
+    java.lang.reflect.Field causeField = 
Throwable.class.getDeclaredField("cause");
+    causeField.setAccessible(true);
+    causeField.set(a, b);
+    causeField.set(b, a);
+    // Neither a nor b is a connection-class type. The walk must return
+    // false (not loop forever and not throw).
+    assertFalse(ConnectionFailureUtils.isConnectionFailure(a),
+        "length-2 cycle must terminate cleanly");
+  }
+
+  /**
+   * Defense against an unbounded chain of non-connection-class
+   * exceptions: the depth bound must kick in.
+   */
+  @Test
+  public void testUnboundedChainOfNonMatchingTerminates() throws Exception {
+    Throwable head = new RuntimeException("0");
+    Throwable cursor = head;
+    java.lang.reflect.Field causeField = 
Throwable.class.getDeclaredField("cause");
+    causeField.setAccessible(true);
+    // Build a 1024-deep chain. The walker must NOT traverse the whole
+    // thing -- the bound caps cost.
+    for (int i = 1; i < 1024; i++) {
+      Throwable next = new RuntimeException(Integer.toString(i));
+      causeField.set(cursor, next);
+      cursor = next;

Review Comment:
   Fixed in `46e7f544`. Rewrote `testUnboundedChainOfNonMatchingTerminates` to 
use `initCause` instead of reflective field manipulation. JDK 16+ portable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to