This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new ab12b35b6 fix(pd): add timeout and null-safety to
getLeaderGrpcAddress() (#2961)
ab12b35b6 is described below
commit ab12b35b6adfaf8895df9b4023cf2a25d23cd215
Author: Himanshu Verma <[email protected]>
AuthorDate: Fri Mar 20 13:47:42 2026 +0530
fix(pd): add timeout and null-safety to getLeaderGrpcAddress() (#2961)
* fix(pd): add timeout and null-safety to getLeaderGrpcAddress()
The bolt RPC call in getLeaderGrpcAddress() returns null in Docker
bridge network mode, causing NPE when a follower PD node attempts
to discover the leader's gRPC address. This breaks store registration
and partition distribution when any node other than pd0 wins the
raft leader election.
Add a bounded timeout using the configured rpc-timeout, null-check
the RPC response, and fall back to deriving the address from the
raft endpoint IP when the RPC fails.
Closes apache/hugegraph#2959
---
.../org/apache/hugegraph/pd/raft/RaftEngine.java | 76 ++++++---
.../apache/hugegraph/pd/core/PDCoreSuiteTest.java | 2 +
.../pd/raft/RaftEngineLeaderAddressTest.java | 183 +++++++++++++++++++++
3 files changed, 237 insertions(+), 24 deletions(-)
diff --git
a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
index b73364ae6..2b08de7d4 100644
---
a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
+++
b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -49,7 +50,6 @@ import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.option.NodeOptions;
-import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.RpcOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
@@ -86,8 +86,12 @@ public class RaftEngine {
}
this.config = config;
+ // Wire configured rpc timeout into RaftRpcClient so the Bolt transport
+ // timeout and the future.get() caller timeout in
getLeaderGrpcAddress() are consistent.
raftRpcClient = new RaftRpcClient();
- raftRpcClient.init(new RpcOptions());
+ RpcOptions rpcOptions = new RpcOptions();
+ rpcOptions.setRpcDefaultTimeout(config.getRpcTimeout());
+ raftRpcClient.init(rpcOptions);
String raftPath = config.getDataPath() + "/" + groupId;
new File(raftPath).mkdirs();
@@ -119,10 +123,7 @@ public class RaftEngine {
nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout());
nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout());
nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout());
- // Set the raft configuration
- RaftOptions raftOptions = nodeOptions.getRaftOptions();
-
- nodeOptions.setEnableMetrics(true);
+ // TODO: tune RaftOptions for PD (see hugegraph-store PartitionEngine
for reference)
final PeerId serverId = JRaftUtils.getPeerId(config.getAddress());
@@ -228,7 +229,7 @@ public class RaftEngine {
}
/**
- * Send a message to the leader to get the grpc address;
+ * Send a message to the leader to get the grpc address.
*/
public String getLeaderGrpcAddress() throws ExecutionException,
InterruptedException {
if (isLeader()) {
@@ -236,11 +237,49 @@ public class RaftEngine {
}
if (raftNode.getLeaderId() == null) {
- waitingForLeader(10000);
+ waitingForLeader(config.getRpcTimeout());
+ }
+
+ // Cache leader to avoid repeated getLeaderId() calls and guard against
+ // waitingForLeader() returning without a leader being elected.
+ PeerId leader = raftNode.getLeaderId();
+ if (leader == null) {
+ throw new ExecutionException(new IllegalStateException("Leader is
not ready"));
+ }
+
+ RaftRpcProcessor.GetMemberResponse response = null;
+ try {
+ // TODO: a more complete fix would need a source of truth for the
leader's
+ // actual grpcAddress rather than deriving it from the local
node's port config.
+ response = raftRpcClient
+ .getGrpcAddress(leader.getEndpoint().toString())
+ .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ if (response != null && response.getGrpcAddress() != null) {
+ return response.getGrpcAddress();
+ }
+ if (response == null) {
+ log.warn("Leader RPC response is null for {}, falling back to
derived address",
+ leader);
+ } else {
+ log.warn("Leader gRPC address field is null in RPC response
for {}, "
+ + "falling back to derived address", leader);
+ }
+ } catch (TimeoutException e) {
+ log.warn("Timed out resolving leader gRPC address for {}, falling
back to derived "
+ + "address", leader);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ log.warn("Failed to resolve leader gRPC address for {}, falling
back to derived "
+ + "address", leader, cause);
}
- return
raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get()
- .getGrpcAddress();
+ // Best-effort fallback: derive from leader raft endpoint IP + local
gRPC port.
+ // WARNING: this may be incorrect in clusters where PD nodes use
different grpc.port
+ // values, a proper fix requires a cluster-wide source of truth for
gRPC addresses.
+ String derived = leader.getEndpoint().getIp() + ":" +
config.getGrpcPort();
+ log.info("Using derived leader gRPC address {} - may be incorrect if
nodes use different ports",
+ derived);
+ return derived;
}
/**
@@ -322,14 +361,7 @@ public class RaftEngine {
newPeers.parse(peerList);
CountDownLatch latch = new CountDownLatch(1);
this.raftNode.changePeers(newPeers, status -> {
- // Use compareAndSet so a late callback does not overwrite a
timeout status
result.compareAndSet(null, status);
- // Refresh inside callback so it fires even if caller already
timed out
- // Note: changePeerList() uses Configuration.parse() which
only supports
- // plain comma-separated peer addresses with no learner syntax.
- // getLearners() will always be empty here. Learner support is
handled
- // in PDService.updatePdRaft() which uses
PeerUtil.parseConfig()
- // and supports the /learner suffix.
if (status != null && status.isOk()) {
IpAuthHandler handler = IpAuthHandler.getInstance();
if (handler != null) {
@@ -347,16 +379,12 @@ public class RaftEngine {
}
latch.countDown();
});
- // Use 3x configured RPC timeout — bare await() would block
forever if
- // the callback is never invoked (e.g. node not started / RPC
failure)
- boolean completed = latch.await(3L * config.getRpcTimeout(),
- TimeUnit.MILLISECONDS);
+ boolean completed = latch.await(3L * config.getRpcTimeout(),
TimeUnit.MILLISECONDS);
if (!completed && result.get() == null) {
Status timeoutStatus = new Status(RaftError.EINTERNAL,
"changePeerList timed out
after %d ms",
3L * config.getRpcTimeout());
if (!result.compareAndSet(null, timeoutStatus)) {
- // Callback arrived just before us — keep its result
timeoutStatus = null;
}
if (timeoutStatus != null) {
@@ -387,7 +415,8 @@ public class RaftEngine {
long start = System.currentTimeMillis();
while ((System.currentTimeMillis() - start < timeOut) && (leader
== null)) {
try {
- this.wait(1000);
+ long remaining = timeOut - (System.currentTimeMillis() -
start);
+ this.wait(Math.min(1000, Math.max(0, remaining)));
} catch (InterruptedException e) {
log.error("Raft wait for leader exception", e);
}
@@ -395,7 +424,6 @@ public class RaftEngine {
}
return leader;
}
-
}
public Node getRaftNode() {
diff --git
a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
index 57fd36717..87d1500bc 100644
---
a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
+++
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/core/PDCoreSuiteTest.java
@@ -21,6 +21,7 @@ import
org.apache.hugegraph.pd.core.meta.MetadataKeyHelperTest;
import org.apache.hugegraph.pd.core.store.HgKVStoreImplTest;
import org.apache.hugegraph.pd.raft.IpAuthHandlerTest;
import org.apache.hugegraph.pd.raft.RaftEngineIpAuthIntegrationTest;
+import org.apache.hugegraph.pd.raft.RaftEngineLeaderAddressTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j;
TaskScheduleServiceTest.class,
IpAuthHandlerTest.class,
RaftEngineIpAuthIntegrationTest.class,
+ RaftEngineLeaderAddressTest.class,
// StoreNodeServiceTest.class,
})
@Slf4j
diff --git
a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java
new file mode 100644
index 000000000..420b106a2
--- /dev/null
+++
b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.raft;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RaftEngineLeaderAddressTest {
+
+ private static final String LEADER_IP = "10.0.0.1";
+ private static final int GRPC_PORT = 8686;
+ private static final String LEADER_GRPC_ADDRESS = "10.0.0.1:8686";
+
+ private Node originalRaftNode;
+ private RaftRpcClient originalRaftRpcClient;
+ private PDConfig.Raft originalConfig;
+
+ private Node mockNode;
+ private RaftRpcClient mockRpcClient;
+ private PDConfig.Raft mockConfig;
+ private PeerId mockLeader;
+
+ @Before
+ public void setUp() {
+ RaftEngine engine = RaftEngine.getInstance();
+
+ // Save originals
+ originalRaftNode = engine.getRaftNode();
+ originalRaftRpcClient = Whitebox.getInternalState(engine,
"raftRpcClient");
+ originalConfig = Whitebox.getInternalState(engine, "config");
+
+ // Build mock leader PeerId with real Endpoint
+ mockLeader = mock(PeerId.class);
+ Endpoint endpoint = new Endpoint(LEADER_IP, 8610);
+ when(mockLeader.getEndpoint()).thenReturn(endpoint);
+
+ // Build mock Node that reports itself as follower with a known leader
+ mockNode = mock(Node.class);
+ when(mockNode.isLeader(true)).thenReturn(false);
+ when(mockNode.getLeaderId()).thenReturn(mockLeader);
+
+ // Build mock config
+ // Use a short default timeout (100ms); specific tests may override
getRpcTimeout()
+ mockConfig = mock(PDConfig.Raft.class);
+ when(mockConfig.getGrpcAddress()).thenReturn("127.0.0.1:" + GRPC_PORT);
+ when(mockConfig.getGrpcPort()).thenReturn(GRPC_PORT);
+ when(mockConfig.getRpcTimeout()).thenReturn(100);
+
+ // Build mock RpcClient
+ mockRpcClient = mock(RaftRpcClient.class);
+
+ // Inject mocks
+ Whitebox.setInternalState(engine, "raftNode", mockNode);
+ Whitebox.setInternalState(engine, "raftRpcClient", mockRpcClient);
+ Whitebox.setInternalState(engine, "config", mockConfig);
+ }
+
+ @After
+ public void tearDown() {
+ RaftEngine engine = RaftEngine.getInstance();
+ Whitebox.setInternalState(engine, "raftNode", originalRaftNode);
+ Whitebox.setInternalState(engine, "raftRpcClient",
originalRaftRpcClient);
+ Whitebox.setInternalState(engine, "config", originalConfig);
+ }
+
+ @Test
+ public void testSuccessReturnsGrpcAddress() throws Exception {
+ // RPC succeeds and returns a valid gRPC address
+ RaftRpcProcessor.GetMemberResponse response =
+ mock(RaftRpcProcessor.GetMemberResponse.class);
+ when(response.getGrpcAddress()).thenReturn(LEADER_GRPC_ADDRESS);
+
+ CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+ CompletableFuture.completedFuture(response);
+ when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+ String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+ Assert.assertEquals(LEADER_GRPC_ADDRESS, result);
+ }
+
+ @Test
+ public void testTimeoutFallsBackToDerivedAddress() throws Exception {
+ // RPC times out — should fall back to leaderIp:grpcPort
+ CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+ mock(CompletableFuture.class);
+ when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
+ .thenThrow(new TimeoutException("simulated timeout"));
+ when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+ String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+ Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+ }
+
+ @Test
+ public void testRpcExceptionFallsBackToDerivedAddress() throws Exception {
+ // RPC throws ExecutionException — should fall back to
leaderIp:grpcPort
+ CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+ mock(CompletableFuture.class);
+ when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
+ .thenThrow(new ExecutionException("simulated rpc failure",
+ new RuntimeException("bolt
error")));
+ when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+ String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+ Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+ }
+
+ @Test
+ public void testNullResponseFallsBackToDerivedAddress() throws Exception {
+ // RPC returns null response — should fall back to leaderIp:grpcPort
+ CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+ CompletableFuture.completedFuture(null);
+ when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+ String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+ Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+ }
+
+ @Test
+ public void testNullGrpcAddressInResponseFallsBackToDerivedAddress()
throws Exception {
+ // RPC returns a response but grpcAddress field is null — should fall
back
+ RaftRpcProcessor.GetMemberResponse response =
+ mock(RaftRpcProcessor.GetMemberResponse.class);
+ when(response.getGrpcAddress()).thenReturn(null);
+
+ CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+ CompletableFuture.completedFuture(response);
+ when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+ String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+ Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+ }
+
+ @Test
+ public void testNullLeaderAfterWaitThrowsExecutionException() throws
Exception {
+ // Use 0ms timeout so waitingForLeader(0) skips the wait loop and
returns immediately
+ when(mockConfig.getRpcTimeout()).thenReturn(0);
+ // Leader is still null after waitingForLeader() — should throw
ExecutionException
+ when(mockNode.getLeaderId()).thenReturn(null);
+
+ try {
+ RaftEngine.getInstance().getLeaderGrpcAddress();
+ Assert.fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalStateException);
+ Assert.assertEquals("Leader is not ready",
e.getCause().getMessage());
+ }
+ }
+}