kerneltime commented on code in PR #10470:
URL: https://github.com/apache/ozone/pull/10470#discussion_r3382683598
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -311,7 +338,13 @@ public void addIncrementalReport(Message report) {
// see XceiverServerRatis#sendPipelineReport
synchronized (incrementalReportsQueue) {
for (InetSocketAddress endpoint : endpoints) {
- incrementalReportsQueue.get(endpoint).add(report);
+ // Same migration race shape as addPipelineActionIfAbsent: an
+ // endpoint observed in the COW set may have just had its
+ // queue removed by migrateEndpoint. Skip rather than NPE.
+ List<Message> queue = incrementalReportsQueue.get(endpoint);
+ if (queue != null) {
+ queue.add(report);
+ }
Review Comment:
Fixed in `c27a7c43`. Re-ordered `migrateEndpoint` so the invariant "endpoint
∈ endpoints ⇒ queue exists" holds at every observable point: PUBLISH new-key
queues → SWITCH endpoints set (add new, then remove old) → RETIRE old-key
queues. Now there is no window where a producer iterating `endpoints` sees a
key without a queue. The producer null-skips remain as defense-in-depth but the
load-bearing guarantee is the ordering. Logged as LENS-010 in my
MISSED_LENSES.md log: "producer null-skip is not a substitute for fixing the
underlying invariant" — caught by you, not by my lens, exactly because the lens
accepted the band-aid as a fix.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -132,42 +133,169 @@ public void writeUnlock() {
*/
public void addSCMServer(InetSocketAddress address,
String threadNamePrefix) throws IOException {
+ addSCMServer(address, null, threadNamePrefix);
+ }
+
+ /**
+ * Adds a new SCM machine and remembers the original host:port string
+ * so the DN can re-resolve DNS on connection failure (e.g. after the
+ * SCM peer is rescheduled to a new pod IP in Kubernetes).
+ *
+ * @param address resolved RPC address used to build the proxy
+ * @param hostAndPort original "host:port" string, or null to disable
+ * DNS re-resolution for this endpoint
+ * @param threadNamePrefix prefix for the endpoint's task thread
+ */
+ public void addSCMServer(InetSocketAddress address, String hostAndPort,
+ String threadNamePrefix) throws IOException {
writeLock();
try {
if (scmMachines.containsKey(address)) {
LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
"Ignoring the request.");
return;
}
+ EndpointStateMachine endPoint =
+ buildScmEndpoint(address, hostAndPort, threadNamePrefix);
+ scmMachines.put(address, endPoint);
+ } finally {
+ writeUnlock();
+ }
+ }
- Configuration hadoopConfig =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
- RPC.setProtocolEngine(
- hadoopConfig,
- StorageContainerDatanodeProtocolPB.class,
- ProtobufRpcEngine.class);
- long version =
- RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+ /**
+ * Build (but do NOT register) a fresh active-SCM endpoint bound to
+ * {@code address}. The caller is responsible for registering it in
+ * {@code scmMachines} (and tearing down any previous endpoint it
+ * replaces). Factored out of {@link #addSCMServer} so
+ * {@link #refreshSCMServer} can construct the replacement BEFORE
+ * removing the stale entry, preserving the peer in {@code scmMachines}
+ * if proxy construction throws (transient DNS failure, peer not yet
+ * accepting on the new IP, etc.).
+ */
+ @VisibleForTesting
+ EndpointStateMachine buildScmEndpoint(InetSocketAddress address,
+ String hostAndPort, String threadNamePrefix) throws IOException {
+ Configuration hadoopConfig =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+ RPC.setProtocolEngine(
+ hadoopConfig,
+ StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long version =
+ RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
- RetryPolicy retryPolicy =
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
- TimeUnit.MILLISECONDS);
+ RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+ TimeUnit.MILLISECONDS);
- StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
- StorageContainerDatanodeProtocolPB.class, version,
- address, UserGroupInformation.getCurrentUser(), hadoopConfig,
- NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
- retryPolicy).getProxy();
+ StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+ StorageContainerDatanodeProtocolPB.class, version,
+ address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+ NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
+ retryPolicy).getProxy();
- StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
- new StorageContainerDatanodeProtocolClientSideTranslatorPB(
- rpcProxy);
+ StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+ new StorageContainerDatanodeProtocolClientSideTranslatorPB(
+ rpcProxy);
- EndpointStateMachine endPoint = new EndpointStateMachine(address,
- rpcClient, this.conf, threadNamePrefix);
- endPoint.setPassive(false);
- scmMachines.put(address, endPoint);
+ EndpointStateMachine endPoint = new EndpointStateMachine(address,
+ hostAndPort, rpcClient, this.conf, threadNamePrefix);
+ endPoint.setPassive(false);
+ return endPoint;
+ }
+
+ /**
+ * Re-resolve the SCM hostname for the endpoint at {@code oldAddress} and,
+ * if the resolved IP has changed, atomically replace the endpoint with a
+ * fresh one bound to the new address.
+ * <p>
+ * Returns the new {@link InetSocketAddress} on a successful swap, or null
+ * if no swap occurred (endpoint not found, hostname not preserved at
+ * construction, IP unchanged, or DNS lookup failed). Callers receive
+ * enough information to update any external maps keyed by the old
+ * address.
+ * <p>
+ * The replacement is built via the existing {@link #addSCMServer} path,
+ * so the new endpoint starts in {@code GETVERSION} state and re-walks
+ * version → register → heartbeat. This is the correct behavior: a peer
+ * that has been rescheduled is effectively a fresh process.
+ */
+ public InetSocketAddress refreshSCMServer(InetSocketAddress oldAddress,
+ String threadNamePrefix) throws IOException {
+ writeLock();
+ try {
+ EndpointStateMachine existing = scmMachines.get(oldAddress);
+ if (existing == null) {
+ return null;
+ }
+ // Recon endpoints (added via addReconServer) speak a different
+ // protocol than active SCM endpoints. The current refresh path
+ // only knows how to rebuild SCM endpoints, so refusing to
+ // refresh a passive endpoint avoids silently downgrading a
+ // Recon endpoint to an SCM-protocol one. Recon's cached IP is
+ // also a much narrower problem in practice (Recon is rarely
+ // pod-rescheduled the way SCM-HA peers are).
+ if (existing.isPassive()) {
+ return null;
+ }
+ InetSocketAddress refreshed = existing.resolveLatestAddress();
+ if (refreshed == null) {
+ return null;
+ }
+ // Refuse the swap if the freshly-resolved address collides with
+ // another already-registered SCM peer key (e.g. transient kube-dns
+ // returning peer-B's IP for peer-A's hostname). Without this guard,
+ // the put below would silently overwrite peer-B's
+ // EndpointStateMachine, leaking its executor and orphaning its
+ // task thread, while peer-A's task ends up dialing peer-B's IP
+ // with peer-A's host context. Leave the stale endpoint in place;
+ // the next heartbeat retries DNS.
+ if (!refreshed.equals(oldAddress)
+ && scmMachines.containsKey(refreshed)) {
+ LOG.warn("DNS re-resolution: refused to swap endpoint {} -> {} "
+ + "because the new address collides with an already-registered "
+ + "SCM peer. Leaving stale endpoint in place.",
+ oldAddress, refreshed);
+ return null;
+ }
+ String hostAndPort = existing.getHostAndPort();
+ // Build the replacement BEFORE removing the stale entry so a
+ // failure to construct the new proxy (transient DNS, peer not
+ // yet accepting on the new IP, NetUtils refusing the address)
+ // leaves the existing endpoint registered. Otherwise the peer
+ // would disappear from scmMachines entirely and the next
+ // heartbeat cycle would have nothing to dial -- much worse than
+ // the pre-PR behaviour of dialing the stale IP.
+ EndpointStateMachine replacement;
+ try {
+ replacement = buildScmEndpoint(refreshed, hostAndPort,
+ threadNamePrefix);
+ } catch (IOException buildEx) {
+ LOG.warn("DNS re-resolution: failed to build replacement SCM "
+ + "endpoint for {} -> {} (host {}); leaving stale endpoint "
+ + "in place. Cause: {}", oldAddress, refreshed, hostAndPort,
+ buildEx.getMessage());
+ throw buildEx;
+ }
+ // Replacement is built; commit the swap atomically under the
+ // write lock we already hold, then close the old endpoint
+ // outside the critical section's correctness path (close()
+ // failures only affect cleanup, not registration).
+ scmMachines.put(refreshed, replacement);
+ if (!refreshed.equals(oldAddress)) {
+ scmMachines.remove(oldAddress);
+ }
+ try {
+ existing.close();
+ } catch (RuntimeException closeEx) {
+ LOG.warn("Failed to close stale endpoint {}: {}", oldAddress,
+ closeEx.getMessage());
Review Comment:
Fixed in `c27a7c43`. `existing.close()` now runs OUTSIDE `writeLock`.
Pattern: capture the stale endpoint reference under the lock alongside the swap
commit, release the lock, then close with the same try/catch. Concurrent
`getValues()` callers and other endpoints' heartbeats no longer stall on
`RPC.stopProxy` / socket teardown. Logged as LENS-011 + a META rule: my R3
failure-injection persona DID flag this as a Concern and I dismissed it as
"acceptable, pre-existing pattern." That triage was wrong; dismissal is what
got re-found. New CLAUDE.md rule: "failure-injection lens findings get
fix-or-justify, never silent dismissal."
##########
hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProviderRefreshWired.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_RESOLVE_NEEDED_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.util.StringJoiner;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Wired-path tests for {@code OMFailoverProxyProviderBase.shouldRetry}'s
+ * interaction with the new connection-class filter and refresh hook.
+ * These complement {@code TestConnectionFailureUtils} (helper-in-isolation)
+ * and {@code TestOMProxyInfoDnsRefresh} (per-instance refresh) by
+ * exercising the actual retry policy whose return value drives the
+ * RetryInvocationHandler in production.
+ * <p>
+ * The "load-bearing" assertion is that a {@link SocketTimeoutException}
+ * -- the AWS EC2 / EKS silent-drop case the PR is sold on -- routed
+ * through {@code shouldRetry} actually triggers the per-node DNS refresh
+ * on the current OM. {@code TestConnectionFailureUtils} proves the
+ * filter classifies it correctly in isolation; this test proves the
+ * filter is wired.
+ */
+public class TestOMFailoverProxyProviderRefreshWired {
+
+ private static final String OM_SERVICE_ID = "om-svc-refresh-wired";
+ private OzoneConfiguration conf;
+
+ @BeforeEach
+ public void setUp() {
+ conf = new OzoneConfiguration();
+ StringJoiner ids = new StringJoiner(",");
+ for (int i = 1; i <= 3; i++) {
+ String nodeId = "om-" + i;
+ conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID,
+ nodeId), "localhost:" + (9860 + i));
+ ids.add(nodeId);
+ }
+ conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
+ ids.toString());
+ }
+
+ /**
+ * A counting subclass that records each call to
+ * {@code maybeRefreshCurrentOmAddress} so the test can assert
+ * exactly when the wiring fires.
+ */
+ private static final class CountingProvider
+ extends HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> {
+ int refreshCalls;
+
+ CountingProvider(OzoneConfiguration c) throws IOException {
+ super(c, UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class);
+ }
+
+ @Override
+ synchronized boolean maybeRefreshCurrentOmAddress() {
+ refreshCalls++;
+ return false;
+ }
+ }
+
+ /**
+ * SocketTimeoutException through {@code shouldRetry} -- the AWS
+ * silent-drop scenario -- must invoke the refresh hook when the
+ * flag is on. Round 1 personas flagged this exception type as
+ * missing from the original filter; Round 2 added it to
+ * ConnectionFailureUtils. This test proves the wiring.
+ */
+ @Test
+ public void testSocketTimeoutTriggersRefreshHook() throws Exception {
+ conf.setBoolean(OZONE_CLIENT_FAILOVER_RESOLVE_NEEDED_KEY, true);
+ CountingProvider p = new CountingProvider(conf);
+ RetryPolicy policy = p.getRetryPolicy(10);
+ RetryPolicy.RetryAction action = policy.shouldRetry(
+ new SocketTimeoutException("EC2 silent drop"), 0, 0, false);
+ assertEquals(RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+ action.action);
+ assertEquals(1, p.refreshCalls,
+ "SocketTimeoutException must invoke the refresh hook exactly once");
+ }
+
+ /**
+ * ConnectException (the OpenStack fast-RST scenario) must also
+ * invoke the refresh hook. Together with the SocketTimeout test,
+ * this proves the filter covers both K8s failure shapes the JIRA
+ * description names.
+ */
+ @Test
+ public void testConnectExceptionTriggersRefreshHook() throws Exception {
+ conf.setBoolean(OZONE_CLIENT_FAILOVER_RESOLVE_NEEDED_KEY, true);
+ CountingProvider p = new CountingProvider(conf);
+ RetryPolicy policy = p.getRetryPolicy(10);
+ policy.shouldRetry(
+ new IOException("connection refused", new ConnectException()), 0, 0,
false);
+ assertEquals(1, p.refreshCalls);
+ }
+
+ /**
+ * Application-level errors (an OMException not wrapped in a
+ * connection-class) must NOT invoke the refresh hook. Re-resolving
+ * DNS would not help and would amplify load.
+ */
+ @Test
+ public void testApplicationLevelErrorDoesNotTriggerRefresh()
+ throws Exception {
+ conf.setBoolean(OZONE_CLIENT_FAILOVER_RESOLVE_NEEDED_KEY, true);
+ CountingProvider p = new CountingProvider(conf);
+ RetryPolicy policy = p.getRetryPolicy(10);
+ policy.shouldRetry(new OMException("not the leader",
+ ResultCodes.INTERNAL_ERROR), 0, 0, false);
+ assertEquals(0, p.refreshCalls,
+ "OMException is application-level; refresh hook must NOT fire");
+ }
+
+ /**
+ * Flag-off invariant: even on a connection-class exception, the
+ * refresh hook must NOT be invoked when the resolve-needed flag is
+ * false. Guards the "default-off" safety claim of the PR.
+ */
+ @Test
+ public void testFlagDisabledSuppressesRefresh() throws Exception {
+ conf.setBoolean(OZONE_CLIENT_FAILOVER_RESOLVE_NEEDED_KEY, false);
+ CountingProvider p = new CountingProvider(conf);
+ RetryPolicy policy = p.getRetryPolicy(10);
+ policy.shouldRetry(new ConnectException("refused"), 0, 0, false);
+ assertEquals(0, p.refreshCalls,
+ "with the flag off the refresh hook must never fire, even for "
+ + "connection-class exceptions");
+ }
+
+ /**
+ * Verifies the C2 "retry-same-proxy" pin: when a refresh succeeds,
+ * the next failover must STAY on the just-refreshed nodeId rather
+ * than advancing to the next peer in the failover ring.
+ * <p>
+ * Round 3 found that the prior version of this test was vacuous:
+ * with both currentProxyIndex and nextProxyIndex initialised to 0
+ * from construction, performFailover was a no-op (currentProxyIndex
+ * = nextProxyIndex = 0), and the assertion held REGARDLESS of
+ * whether the pin code at OMFailoverProxyProviderBase.shouldRetry
+ * actually invoked setNextOmProxy. To make the pin observably
+ * load-bearing, this test PRE-ADVANCES nextProxyIndex by triggering
+ * a non-refresh shouldRetry first (an OMException, which is not a
+ * connection-class failure). That sets nextProxyIndex to (current+1).
+ * Then a second shouldRetry with a connection-class exception fires
+ * the refresh-success path, which MUST pull nextProxyIndex back to
+ * the current node. If the pin code is broken, the post-test
+ * currentProxyOMNodeId will be the NEXT node, not the original.
+ */
+ @Test
+ public void testRefreshSuccessPinsCurrentNodeId() throws Exception {
+ conf.setBoolean(OZONE_CLIENT_FAILOVER_RESOLVE_NEEDED_KEY, true);
+ HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB> p =
+ new HadoopRpcOMFailoverProxyProvider<OzoneManagerProtocolPB>(
+ conf, UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
+ OzoneManagerProtocolPB.class) {
+ @Override
+ boolean maybeRefreshCurrentOmAddress() {
+ return true; // pretend the swap happened
+ }
+ };
+
+ String beforeNode = p.getCurrentProxyOMNodeId();
+ RetryPolicy policy = p.getRetryPolicy(10);
+
+ // Pre-advance nextProxyIndex by triggering a non-refresh failover
+ // (selectNextOmProxy increments nextProxyIndex). We use a wrapper
+ // exception that does NOT pass isConnectionFailure so the refresh
+ // hook is NOT invoked here.
+ policy.shouldRetry(new IOException("not-a-connection-failure"),
+ 0, 0, false);
+ // Sanity: a subsequent performFailover would now move us off the
+ // original node, because nextProxyIndex was advanced.
+
+ // Now trigger the connection-failure path with refresh enabled.
+ // The pin MUST pull nextProxyIndex back to the original node.
+ RetryPolicy.RetryAction action = policy.shouldRetry(
+ new ConnectException("refused"), 0, 1, false);
+ assertTrue(
+ action.action ==
RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY,
+ "refresh-success path returns FAILOVER_AND_RETRY so the retry "
+ + "framework re-dials with the new IP");
+ p.performFailover(null);
+ assertEquals(beforeNode, p.getCurrentProxyOMNodeId(),
+ "after a successful refresh, performFailover must STAY on the "
+ + "original nodeId even though a prior shouldRetry advanced "
+ + "nextProxyIndex -- otherwise the freshly-fixed peer is "
+ + "bypassed for up to N-1 retries");
+ assertFalse(beforeNode == null);
Review Comment:
Fixed in `c27a7c43`. Replaced `assertFalse(beforeNode == null)` with
`assertNotNull(beforeNode)`.
##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestSCMConnectionManagerDnsRefreshE2E.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
Review Comment:
Fixed in `c27a7c43`. Removed the unused `java.time.Duration` import.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]