Copilot commented on code in PR #10470:
URL: https://github.com/apache/ozone/pull/10470#discussion_r3382763910
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -34,6 +34,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
Review Comment:
Import order is inconsistent with the repo’s `CustomImportOrder` +
alphabetical sorting (e.g., `com.google.*` should come before `java.*`). As-is,
this is likely to fail checkstyle/spotless import ordering.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/SCMConnectionManager.java:
##########
@@ -132,45 +133,183 @@ public void writeUnlock() {
*/
public void addSCMServer(InetSocketAddress address,
String threadNamePrefix) throws IOException {
+ addSCMServer(address, null, threadNamePrefix);
+ }
+
+ /**
+ * Adds a new SCM machine and remembers the original host:port string
+ * so the DN can re-resolve DNS on connection failure (e.g. after the
+ * SCM peer is rescheduled to a new pod IP in Kubernetes).
+ *
+ * @param address resolved RPC address used to build the proxy
+ * @param hostAndPort original "host:port" string, or null to disable
+ * DNS re-resolution for this endpoint
+ * @param threadNamePrefix prefix for the endpoint's task thread
+ */
+ public void addSCMServer(InetSocketAddress address, String hostAndPort,
+ String threadNamePrefix) throws IOException {
writeLock();
try {
if (scmMachines.containsKey(address)) {
LOG.warn("Trying to add an existing SCM Machine to Machines group. " +
"Ignoring the request.");
return;
}
+ EndpointStateMachine endPoint =
+ buildScmEndpoint(address, hostAndPort, threadNamePrefix);
+ scmMachines.put(address, endPoint);
+ } finally {
+ writeUnlock();
+ }
+ }
- Configuration hadoopConfig =
- LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
- RPC.setProtocolEngine(
- hadoopConfig,
- StorageContainerDatanodeProtocolPB.class,
- ProtobufRpcEngine.class);
- long version =
- RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
+ /**
+ * Build (but do NOT register) a fresh active-SCM endpoint bound to
+ * {@code address}. The caller is responsible for registering it in
+ * {@code scmMachines} (and tearing down any previous endpoint it
+ * replaces). Factored out of {@link #addSCMServer} so
+ * {@link #refreshSCMServer} can construct the replacement BEFORE
+ * removing the stale entry, preserving the peer in {@code scmMachines}
+ * if proxy construction throws (transient DNS failure, peer not yet
+ * accepting on the new IP, etc.).
+ */
+ @VisibleForTesting
+ EndpointStateMachine buildScmEndpoint(InetSocketAddress address,
+ String hostAndPort, String threadNamePrefix) throws IOException {
+ Configuration hadoopConfig =
+ LegacyHadoopConfigurationSource.asHadoopConfiguration(this.conf);
+ RPC.setProtocolEngine(
+ hadoopConfig,
+ StorageContainerDatanodeProtocolPB.class,
+ ProtobufRpcEngine.class);
+ long version =
+ RPC.getProtocolVersion(StorageContainerDatanodeProtocolPB.class);
- RetryPolicy retryPolicy =
- RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
- TimeUnit.MILLISECONDS);
+ RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ getScmRpcRetryCount(conf), getScmRpcRetryInterval(conf),
+ TimeUnit.MILLISECONDS);
- StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
- StorageContainerDatanodeProtocolPB.class, version,
- address, UserGroupInformation.getCurrentUser(), hadoopConfig,
- NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
- retryPolicy).getProxy();
+ StorageContainerDatanodeProtocolPB rpcProxy = RPC.getProtocolProxy(
+ StorageContainerDatanodeProtocolPB.class, version,
+ address, UserGroupInformation.getCurrentUser(), hadoopConfig,
+ NetUtils.getDefaultSocketFactory(hadoopConfig), getRpcTimeout(),
+ retryPolicy).getProxy();
- StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
- new StorageContainerDatanodeProtocolClientSideTranslatorPB(
- rpcProxy);
+ StorageContainerDatanodeProtocolClientSideTranslatorPB rpcClient =
+ new StorageContainerDatanodeProtocolClientSideTranslatorPB(
+ rpcProxy);
- EndpointStateMachine endPoint = new EndpointStateMachine(address,
- rpcClient, this.conf, threadNamePrefix);
- endPoint.setPassive(false);
- scmMachines.put(address, endPoint);
+ EndpointStateMachine endPoint = new EndpointStateMachine(address,
+ hostAndPort, rpcClient, this.conf, threadNamePrefix);
+ endPoint.setPassive(false);
+ return endPoint;
+ }
+
+ /**
+ * Re-resolve the SCM hostname for the endpoint at {@code oldAddress} and,
+ * if the resolved IP has changed, atomically replace the endpoint with a
+ * fresh one bound to the new address.
+ * <p>
+ * Returns the new {@link InetSocketAddress} on a successful swap, or null
+ * if no swap occurred (endpoint not found, hostname not preserved at
+ * construction, IP unchanged, or DNS lookup failed). Callers receive
+ * enough information to update any external maps keyed by the old
+ * address.
+ * <p>
+ * The replacement is built via the existing {@link #addSCMServer} path,
+ * so the new endpoint starts in {@code GETVERSION} state and re-walks
+ * version → register → heartbeat. This is the correct behavior: a peer
+ * that has been rescheduled is effectively a fresh process.
+ */
+ public InetSocketAddress refreshSCMServer(InetSocketAddress oldAddress,
+ String threadNamePrefix) throws IOException {
+ final EndpointStateMachine staleEndpoint;
+ final InetSocketAddress refreshed;
+ final String hostAndPort;
+ writeLock();
+ try {
+ EndpointStateMachine existing = scmMachines.get(oldAddress);
+ if (existing == null) {
+ return null;
+ }
+ // Recon endpoints (added via addReconServer) speak a different
+ // protocol than active SCM endpoints. The current refresh path
+ // only knows how to rebuild SCM endpoints, so refusing to
+ // refresh a passive endpoint avoids silently downgrading a
+ // Recon endpoint to an SCM-protocol one. Recon's cached IP is
+ // also a much narrower problem in practice (Recon is rarely
+ // pod-rescheduled the way SCM-HA peers are).
+ if (existing.isPassive()) {
+ return null;
+ }
+ InetSocketAddress resolved = existing.resolveLatestAddress();
+ if (resolved == null) {
Review Comment:
`refreshSCMServer` calls `existing.resolveLatestAddress()` while holding the
connection manager write lock. `resolveLatestAddress()` performs
`NetUtils.createSocketAddr(hostAndPort)` (DNS lookup), which can block and
stall unrelated readers/writers (eg `getValues()` for other endpoints’
heartbeats) during resolver slowness/outages. Consider doing DNS resolution
outside `writeLock()` (two-phase: snapshot hostAndPort + current endpoint under
lock, resolve outside, then re-lock and re-check the mapping before committing
the swap).
##########
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestSCMConnectionManager.java:
##########
@@ -46,4 +48,178 @@ public void testRemoveSCMServerDoesNotMarkEndpointShutdown()
Assertions.assertEquals(HEARTBEAT, endpoint.getState());
}
}
+
+ /**
+ * resolveLatestAddress() returns null when no preserved hostname is
+ * available -- legacy code path -- so re-resolution is a no-op for that
+ * endpoint. Operator must restart the DN to pick up a new IP.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNullWithoutHostAndPort() {
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1", 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ address, /*hostAndPort=*/ null, /*endPoint=*/ null,
+ new OzoneConfiguration(), "");
+ Assertions.assertNull(endpoint.resolveLatestAddress());
+ Assertions.assertNull(endpoint.getHostAndPort());
+ }
+
+ /**
+ * When the cached IP matches what DNS currently returns for the
+ * preserved hostname, resolveLatestAddress() returns null (no swap
+ * needed). Uses "localhost" because it reliably resolves to a loopback
+ * address in any test environment.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNullWhenIpUnchanged()
+ throws Exception {
+ InetAddress loopback = InetAddress.getByName("localhost");
+ InetSocketAddress address = new InetSocketAddress(loopback, 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ address, "localhost:9861", null, new OzoneConfiguration(), "");
+ InetSocketAddress refreshed = endpoint.resolveLatestAddress();
+ Assertions.assertNull(refreshed,
+ "localhost re-resolves to the same loopback address; refresh "
+ + "must report no change so the endpoint is not torn down "
+ + "needlessly.");
+ }
+
+ /**
+ * When the cached IP differs from what DNS currently returns for the
+ * preserved hostname, resolveLatestAddress() returns the freshly-
+ * resolved address. Simulates the "SCM pod was rescheduled to a new
+ * IP" scenario by constructing the endpoint with a deliberately stale
+ * cached IP.
+ */
+ @Test
+ public void testResolveLatestAddressReturnsNewAddressOnIpChange()
+ throws Exception {
+ // Pretend localhost previously resolved to 127.0.0.99 (stale IP).
+ // In real Kubernetes this would be the now-defunct pod IP.
+ InetSocketAddress staleAddress = new InetSocketAddress(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 99}), 9861);
+ EndpointStateMachine endpoint = new EndpointStateMachine(
+ staleAddress, "localhost:9861", null,
+ new OzoneConfiguration(), "");
+ InetSocketAddress refreshed = endpoint.resolveLatestAddress();
+ Assertions.assertNotNull(refreshed,
+ "localhost re-resolves to loopback (typically 127.0.0.1), "
+ + "which differs from the stale 127.0.0.99 we cached; "
+ + "refresh must report the change so the endpoint can "
+ + "swap to the live address.");
+ Assertions.assertEquals(9861, refreshed.getPort());
+ Assertions.assertNotEquals(staleAddress.getAddress(),
+ refreshed.getAddress());
+ }
+
+ /**
+ * refreshSCMServer() swaps an endpoint's address atomically in the
+ * connection manager when the cached IP is stale. The replacement
+ * endpoint starts in GETVERSION state -- the version handshake must
+ * be re-run because the new SCM pod is effectively a fresh process.
+ */
+ @Test
+ public void testRefreshSCMServerSwapsEndpointOnIpChange() throws Exception {
+ try (SCMConnectionManager connectionManager =
+ new SCMConnectionManager(new OzoneConfiguration())) {
+ InetSocketAddress staleAddress = new InetSocketAddress(
+ InetAddress.getByAddress(new byte[] {127, 0, 0, 99}), 9861);
+ connectionManager.addSCMServer(staleAddress, "localhost:9861", "");
+
+ InetSocketAddress refreshed = connectionManager.refreshSCMServer(
+ staleAddress, "");
+
+ Assertions.assertNotNull(refreshed);
+ Assertions.assertEquals(1, connectionManager.getNumOfConnections());
+ EndpointStateMachine swapped =
+ connectionManager.getValues().iterator().next();
+ Assertions.assertEquals(refreshed, swapped.getAddress());
+ Assertions.assertEquals("localhost:9861", swapped.getHostAndPort());
+ }
+ }
+
+ /**
Review Comment:
There’s an orphan Javadoc-style comment block immediately before the
regression test Javadoc. Having `/** ... */` not attached to any declaration
can trigger Checkstyle’s Javadoc checks; this should be a regular block comment
instead.
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java:
##########
@@ -984,6 +1217,7 @@ synchronized boolean putIfAbsent(PipelineKey key,
return map.putIfAbsent(key, pipelineAction) == null;
}
+
synchronized List<PipelineAction> getActions(List<PipelineReport> reports,
Review Comment:
There are two consecutive blank lines between `putIfAbsent` and `getActions`
in `PipelineActionMap`. Checkstyle’s `EmptyLineSeparator`
(allowMultipleEmptyLines=false) is likely to flag this; keep at most a single
blank line here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]