This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new ab12b35b6 fix(pd): add timeout and null-safety to 
getLeaderGrpcAddress() (#2961)
ab12b35b6 is described below

commit ab12b35b6adfaf8895df9b4023cf2a25d23cd215
Author: Himanshu Verma <[email protected]>
AuthorDate: Fri Mar 20 13:47:42 2026 +0530

    fix(pd): add timeout and null-safety to getLeaderGrpcAddress() (#2961)
    
    * fix(pd): add timeout and null-safety to getLeaderGrpcAddress()
    
    The bolt RPC call in getLeaderGrpcAddress() returns null in Docker
    bridge network mode, causing NPE when a follower PD node attempts
    to discover the leader's gRPC address. This breaks store registration
    and partition distribution when any node other than pd0 wins the
    raft leader election.
    
    Add a bounded timeout using the configured rpc-timeout, null-check
    the RPC response, and fall back to deriving the address from the
    raft endpoint IP when the RPC fails.
    
    Closes apache/hugegraph#2959
---
 .../org/apache/hugegraph/pd/raft/RaftEngine.java   |  76 ++++++---
 .../apache/hugegraph/pd/core/PDCoreSuiteTest.java  |   2 +
 .../pd/raft/RaftEngineLeaderAddressTest.java       | 183 +++++++++++++++++++++
 3 files changed, 237 insertions(+), 24 deletions(-)

diff --git 
a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
 
b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
index b73364ae6..2b08de7d4 100644
--- 
a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
+++ 
b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -49,7 +50,6 @@ import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.entity.Task;
 import com.alipay.sofa.jraft.error.RaftError;
 import com.alipay.sofa.jraft.option.NodeOptions;
-import com.alipay.sofa.jraft.option.RaftOptions;
 import com.alipay.sofa.jraft.option.RpcOptions;
 import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
 import com.alipay.sofa.jraft.rpc.RpcServer;
@@ -86,8 +86,12 @@ public class RaftEngine {
         }
         this.config = config;
 
+        // Wire configured rpc timeout into RaftRpcClient so the Bolt transport
+        // timeout and the future.get() caller timeout in 
getLeaderGrpcAddress() are consistent.
         raftRpcClient = new RaftRpcClient();
-        raftRpcClient.init(new RpcOptions());
+        RpcOptions rpcOptions = new RpcOptions();
+        rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout());
+        raftRpcClient.init(rpcOptions);
 
         String raftPath = config.getDataPath() + "/" + groupId;
         new File(raftPath).mkdirs();
@@ -119,10 +123,7 @@ public class RaftEngine {
         nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout());
         nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout());
         nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout());
-        // Set the raft configuration
-        RaftOptions raftOptions = nodeOptions.getRaftOptions();
-
-        nodeOptions.setEnableMetrics(true);
+        // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine 
for reference)
 
         final PeerId serverId = JRaftUtils.getPeerId(config.getAddress());
 
@@ -228,7 +229,7 @@ public class RaftEngine {
     }
 
     /**
-     * Send a message to the leader to get the grpc address;
+     * Send a message to the leader to get the grpc address.
      */
     public String getLeaderGrpcAddress() throws ExecutionException, 
InterruptedException {
         if (isLeader()) {
@@ -236,11 +237,49 @@ public class RaftEngine {
         }
 
         if (raftNode.getLeaderId() == null) {
-            waitingForLeader(10000);
+            waitingForLeader(config.getRpcTimeout());
+        }
+
+        // Cache leader to avoid repeated getLeaderId() calls and guard against
+        // waitingForLeader() returning without a leader being elected.
+        PeerId leader = raftNode.getLeaderId();
+        if (leader == null) {
+            throw new ExecutionException(new IllegalStateException("Leader is 
not ready"));
+        }
+
+        RaftRpcProcessor.GetMemberResponse response = null;
+        try {
+            // TODO: a more complete fix would need a source of truth for the 
leader's
+            // actual grpcAddress rather than deriving it from the local 
node's port config.
+            response = raftRpcClient
+                    .getGrpcAddress(leader.getEndpoint().toString())
+                    .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            if (response != null && response.getGrpcAddress() != null) {
+                return response.getGrpcAddress();
+            }
+            if (response == null) {
+                log.warn("Leader RPC response is null for {}, falling back to 
derived address",
+                         leader);
+            } else {
+                log.warn("Leader gRPC address field is null in RPC response 
for {}, "
+                         + "falling back to derived address", leader);
+            }
+        } catch (TimeoutException e) {
+            log.warn("Timed out resolving leader gRPC address for {}, falling 
back to derived "
+                     + "address", leader);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause() != null ? e.getCause() : e;
+            log.warn("Failed to resolve leader gRPC address for {}, falling 
back to derived "
+                     + "address", leader, cause);
         }
 
-        return 
raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get()
-                            .getGrpcAddress();
+        // Best-effort fallback: derive from leader raft endpoint IP + local 
gRPC port.
+        // WARNING: this may be incorrect in clusters where PD nodes use 
different grpc.port
+        // values, a proper fix requires a cluster-wide source of truth for 
gRPC addresses.
+        String derived = leader.getEndpoint().getIp() + ":" + 
config.getGrpcPort();
+        log.info("Using derived leader gRPC address {} - may be incorrect if 
nodes use different ports",
+                 derived);
+        return derived;
     }
 
     /**
@@ -322,14 +361,7 @@ public class RaftEngine {
             newPeers.parse(peerList);
             CountDownLatch latch = new CountDownLatch(1);
             this.raftNode.changePeers(newPeers, status -> {
-                // Use compareAndSet so a late callback does not overwrite a 
timeout status
                 result.compareAndSet(null, status);
-                // Refresh inside callback so it fires even if caller already 
timed out
-                // Note: changePeerList() uses Configuration.parse() which 
only supports
-                // plain comma-separated peer addresses with no learner syntax.
-                // getLearners() will always be empty here. Learner support is 
handled
-                // in PDService.updatePdRaft() which uses 
PeerUtil.parseConfig()
-                // and supports the /learner suffix.
                 if (status != null && status.isOk()) {
                     IpAuthHandler handler = IpAuthHandler.getInstance();
                     if (handler != null) {
@@ -347,16 +379,12 @@ public class RaftEngine {
                 }
                 latch.countDown();
             });
-            // Use 3x configured RPC timeout — bare await() would block 
forever if
-            // the callback is never invoked (e.g. node not started / RPC 
failure)
-            boolean completed = latch.await(3L * config.getRpcTimeout(),
-                                            TimeUnit.MILLISECONDS);
+            boolean completed = latch.await(3L * config.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
             if (!completed && result.get() == null) {
                 Status timeoutStatus = new Status(RaftError.EINTERNAL,
                                                   "changePeerList timed out 
after %d ms",
                                                   3L * config.getRpcTimeout());
                 if (!result.compareAndSet(null, timeoutStatus)) {
-                    // Callback arrived just before us — keep its result
                     timeoutStatus = null;
                 }
                 if (timeoutStatus != null) {
@@ -387,7 +415,8 @@ public class RaftEngine {
             long start = System.currentTimeMillis();
             while ((System.currentTimeMillis() - start < timeOut) && (leader 
== null)) {
                 try {
-                    this.wait(1000);
+                    long remaining = timeOut - (System.currentTimeMillis() - 
start);
+                    this.wait(Math.min(1000, Math.max(0, remaining)));
                 } catch (InterruptedException e) {
                     log.error("Raft wait for leader exception", e);
                 }
@@ -395,7 +424,6 @@ public class RaftEngine {
             }
             return leader;
         }
-
     }
 
     public Node getRaftNode() {
diff --git 
a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
 
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
index 57fd36717..87d1500bc 100644
--- 
a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
+++ 
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
@@ -21,6 +21,7 @@ import 
org.apache.hugegraph.pd.core.meta.MetadataKeyHelperTest;
 import org.apache.hugegraph.pd.core.store.HgKVStoreImplTest;
 import org.apache.hugegraph.pd.raft.IpAuthHandlerTest;
 import org.apache.hugegraph.pd.raft.RaftEngineIpAuthIntegrationTest;
+import org.apache.hugegraph.pd.raft.RaftEngineLeaderAddressTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
         TaskScheduleServiceTest.class,
         IpAuthHandlerTest.class,
         RaftEngineIpAuthIntegrationTest.class,
+        RaftEngineLeaderAddressTest.class,
         // StoreNodeServiceTest.class,
 })
 @Slf4j
diff --git 
a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java
 
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java
new file mode 100644
index 000000000..420b106a2
--- /dev/null
+++ 
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.raft;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RaftEngineLeaderAddressTest {
+
+    private static final String LEADER_IP = "10.0.0.1";
+    private static final int GRPC_PORT = 8686;
+    private static final String LEADER_GRPC_ADDRESS = "10.0.0.1:8686";
+
+    private Node originalRaftNode;
+    private RaftRpcClient originalRaftRpcClient;
+    private PDConfig.Raft originalConfig;
+
+    private Node mockNode;
+    private RaftRpcClient mockRpcClient;
+    private PDConfig.Raft mockConfig;
+    private PeerId mockLeader;
+
+    @Before
+    public void setUp() {
+        RaftEngine engine = RaftEngine.getInstance();
+
+        // Save originals
+        originalRaftNode = engine.getRaftNode();
+        originalRaftRpcClient = Whitebox.getInternalState(engine, 
"raftRpcClient");
+        originalConfig = Whitebox.getInternalState(engine, "config");
+
+        // Build mock leader PeerId with real Endpoint
+        mockLeader = mock(PeerId.class);
+        Endpoint endpoint = new Endpoint(LEADER_IP, 8610);
+        when(mockLeader.getEndpoint()).thenReturn(endpoint);
+
+        // Build mock Node that reports itself as follower with a known leader
+        mockNode = mock(Node.class);
+        when(mockNode.isLeader(true)).thenReturn(false);
+        when(mockNode.getLeaderId()).thenReturn(mockLeader);
+
+        // Build mock config
+        // Use a short default timeout (100ms); specific tests may override 
getRpcTimeout()
+        mockConfig = mock(PDConfig.Raft.class);
+        when(mockConfig.getGrpcAddress()).thenReturn("127.0.0.1:" + GRPC_PORT);
+        when(mockConfig.getGrpcPort()).thenReturn(GRPC_PORT);
+        when(mockConfig.getRpcTimeout()).thenReturn(100);
+
+        // Build mock RpcClient
+        mockRpcClient = mock(RaftRpcClient.class);
+
+        // Inject mocks
+        Whitebox.setInternalState(engine, "raftNode", mockNode);
+        Whitebox.setInternalState(engine, "raftRpcClient", mockRpcClient);
+        Whitebox.setInternalState(engine, "config", mockConfig);
+    }
+
+    @After
+    public void tearDown() {
+        RaftEngine engine = RaftEngine.getInstance();
+        Whitebox.setInternalState(engine, "raftNode", originalRaftNode);
+        Whitebox.setInternalState(engine, "raftRpcClient", 
originalRaftRpcClient);
+        Whitebox.setInternalState(engine, "config", originalConfig);
+    }
+
+    @Test
+    public void testSuccessReturnsGrpcAddress() throws Exception {
+        // RPC succeeds and returns a valid gRPC address
+        RaftRpcProcessor.GetMemberResponse response =
+                mock(RaftRpcProcessor.GetMemberResponse.class);
+        when(response.getGrpcAddress()).thenReturn(LEADER_GRPC_ADDRESS);
+
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                CompletableFuture.completedFuture(response);
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_GRPC_ADDRESS, result);
+    }
+
+    @Test
+    public void testTimeoutFallsBackToDerivedAddress() throws Exception {
+        // RPC times out — should fall back to leaderIp:grpcPort
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                mock(CompletableFuture.class);
+        when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
+                .thenThrow(new TimeoutException("simulated timeout"));
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testRpcExceptionFallsBackToDerivedAddress() throws Exception {
+        // RPC throws ExecutionException — should fall back to 
leaderIp:grpcPort
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                mock(CompletableFuture.class);
+        when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
+                .thenThrow(new ExecutionException("simulated rpc failure",
+                                                  new RuntimeException("bolt 
error")));
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testNullResponseFallsBackToDerivedAddress() throws Exception {
+        // RPC returns null response — should fall back to leaderIp:grpcPort
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                CompletableFuture.completedFuture(null);
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testNullGrpcAddressInResponseFallsBackToDerivedAddress() 
throws Exception {
+        // RPC returns a response but grpcAddress field is null — should fall 
back
+        RaftRpcProcessor.GetMemberResponse response =
+                mock(RaftRpcProcessor.GetMemberResponse.class);
+        when(response.getGrpcAddress()).thenReturn(null);
+
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                CompletableFuture.completedFuture(response);
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testNullLeaderAfterWaitThrowsExecutionException() throws 
Exception {
+        // Use 0ms timeout so waitingForLeader(0) skips the wait loop and 
returns immediately
+        when(mockConfig.getRpcTimeout()).thenReturn(0);
+        // Leader is still null after waitingForLeader() — should throw 
ExecutionException
+        when(mockNode.getLeaderId()).thenReturn(null);
+
+        try {
+            RaftEngine.getInstance().getLeaderGrpcAddress();
+            Assert.fail("Expected ExecutionException");
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+            Assert.assertEquals("Leader is not ready", 
e.getCause().getMessage());
+        }
+    }
+}

Reply via email to