Copilot commented on code in PR #2961:
URL: https://github.com/apache/hugegraph/pull/2961#discussion_r2958539228


##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java:
##########
@@ -228,19 +229,57 @@ public PeerId getLeader() {
     }
 
     /**
-     * Send a message to the leader to get the grpc address;
+     * Send a message to the leader to get the grpc address.
      */
     public String getLeaderGrpcAddress() throws ExecutionException, 
InterruptedException {
         if (isLeader()) {
             return config.getGrpcAddress();
         }
 
         if (raftNode.getLeaderId() == null) {
-            waitingForLeader(10000);
+            waitingForLeader(config.getRpcTimeout());

Review Comment:
   `getLeaderGrpcAddress()` now uses 
`waitingForLeader(config.getRpcTimeout())`, but `waitingForLeader()` sleeps in 
fixed 1000ms intervals (`wait(1000)`), so if `raft.rpc-timeout` is configured < 
1000ms this call can still block ~1s (and generally exceed the intended 
timeout). Consider updating `waitingForLeader()` to wait for `min(1000, 
remainingTime)` or otherwise ensure the method respects the caller-provided 
timeout more closely.
   



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java:
##########
@@ -228,19 +229,57 @@ public PeerId getLeader() {
     }
 
     /**
-     * Send a message to the leader to get the grpc address;
+     * Send a message to the leader to get the grpc address.
      */
     public String getLeaderGrpcAddress() throws ExecutionException, 
InterruptedException {
         if (isLeader()) {
             return config.getGrpcAddress();
         }
 
         if (raftNode.getLeaderId() == null) {
-            waitingForLeader(10000);
+            waitingForLeader(config.getRpcTimeout());
+        }
+
+        // Cache leader to avoid repeated getLeaderId() calls and guard against
+        // waitingForLeader() returning without a leader being elected.
+        PeerId leader = raftNode.getLeaderId();
+        if (leader == null) {
+            throw new ExecutionException(new IllegalStateException("Leader is 
not ready"));
+        }
+
+        RaftRpcProcessor.GetMemberResponse response = null;
+        try {
+            // TODO: a more complete fix would need a source of truth for the 
leader's
+            // actual grpcAddress rather than deriving it from the local 
node's port config.
+            response = raftRpcClient
+                    .getGrpcAddress(leader.getEndpoint().toString())
+                    .get(config.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            if (response != null && response.getGrpcAddress() != null) {
+                return response.getGrpcAddress();
+            }
+            if (response == null) {
+                log.warn("Leader RPC response is null for {}, falling back to 
derived address",
+                         leader);
+            } else {
+                log.warn("Leader gRPC address field is null in RPC response 
for {}, "
+                         + "falling back to derived address", leader);
+            }
+        } catch (TimeoutException e) {
+            log.warn("Timed out resolving leader gRPC address for {}, falling 
back to derived "
+                     + "address", leader);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause() != null ? e.getCause() : e;
+            log.warn("Failed to resolve leader gRPC address for {}, falling 
back to derived "
+                     + "address", leader, cause);
         }
 
-        return 
raftRpcClient.getGrpcAddress(raftNode.getLeaderId().getEndpoint().toString()).get()
-                            .getGrpcAddress();
+        // Best-effort fallback: derive from leader raft endpoint IP + local 
gRPC port.
+        // WARNING: this may be incorrect in clusters where PD nodes use 
different grpc.port
+        // values , a proper fix requires a cluster-wide source of truth for 
gRPC addresses.

Review Comment:
   Typo/spacing in the warning comment: `values ,` has an extra space before 
the comma. Please fix to `values,` (and consider rewrapping the comment so it 
reads cleanly).
   



##########
hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/raft/RaftEngineLeaderAddressTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.raft;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.testutil.Whitebox;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.util.Endpoint;
+
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RaftEngineLeaderAddressTest {
+
+    private static final String LEADER_IP = "10.0.0.1";
+    private static final int GRPC_PORT = 8686;
+    private static final String LEADER_GRPC_ADDRESS = "10.0.0.1:8686";
+
+    private Node originalRaftNode;
+    private RaftRpcClient originalRaftRpcClient;
+    private PDConfig.Raft originalConfig;
+
+    private Node mockNode;
+    private RaftRpcClient mockRpcClient;
+    private PDConfig.Raft mockConfig;
+    private PeerId mockLeader;
+
+    @Before
+    public void setUp() {
+        RaftEngine engine = RaftEngine.getInstance();
+
+        // Save originals
+        originalRaftNode = engine.getRaftNode();
+        originalRaftRpcClient = Whitebox.getInternalState(engine, 
"raftRpcClient");
+        originalConfig = Whitebox.getInternalState(engine, "config");
+
+        // Build mock leader PeerId with real Endpoint
+        mockLeader = mock(PeerId.class);
+        Endpoint endpoint = new Endpoint(LEADER_IP, 8610);
+        when(mockLeader.getEndpoint()).thenReturn(endpoint);
+
+        // Build mock Node that reports itself as follower with a known leader
+        mockNode = mock(Node.class);
+        when(mockNode.isLeader(true)).thenReturn(false);
+        when(mockNode.getLeaderId()).thenReturn(mockLeader);
+
+        // Build mock config
+        // Use a short timeout (100ms) so the null-leader test doesn't block 
for seconds
+        mockConfig = mock(PDConfig.Raft.class);
+        when(mockConfig.getGrpcAddress()).thenReturn("127.0.0.1:" + GRPC_PORT);
+        when(mockConfig.getGrpcPort()).thenReturn(GRPC_PORT);
+        when(mockConfig.getRpcTimeout()).thenReturn(100);
+
+        // Build mock RpcClient
+        mockRpcClient = mock(RaftRpcClient.class);
+
+        // Inject mocks
+        Whitebox.setInternalState(engine, "raftNode", mockNode);
+        Whitebox.setInternalState(engine, "raftRpcClient", mockRpcClient);
+        Whitebox.setInternalState(engine, "config", mockConfig);
+    }
+
+    @After
+    public void tearDown() {
+        RaftEngine engine = RaftEngine.getInstance();
+        Whitebox.setInternalState(engine, "raftNode", originalRaftNode);
+        Whitebox.setInternalState(engine, "raftRpcClient", 
originalRaftRpcClient);
+        Whitebox.setInternalState(engine, "config", originalConfig);
+    }
+
+    @Test
+    public void testSuccessReturnsGrpcAddress() throws Exception {
+        // RPC succeeds and returns a valid gRPC address
+        RaftRpcProcessor.GetMemberResponse response =
+                mock(RaftRpcProcessor.GetMemberResponse.class);
+        when(response.getGrpcAddress()).thenReturn(LEADER_GRPC_ADDRESS);
+
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                CompletableFuture.completedFuture(response);
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_GRPC_ADDRESS, result);
+    }
+
+    @Test
+    public void testTimeoutFallsBackToDerivedAddress() throws Exception {
+        // RPC times out — should fall back to leaderIp:grpcPort
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                mock(CompletableFuture.class);
+        when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
+                .thenThrow(new TimeoutException("simulated timeout"));
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testRpcExceptionFallsBackToDerivedAddress() throws Exception {
+        // RPC throws ExecutionException — should fall back to 
leaderIp:grpcPort
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                mock(CompletableFuture.class);
+        when(future.get(anyLong(), eq(TimeUnit.MILLISECONDS)))
+                .thenThrow(new ExecutionException("simulated rpc failure",
+                                                  new RuntimeException("bolt 
error")));
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testNullResponseFallsBackToDerivedAddress() throws Exception {
+        // RPC returns null response — should fall back to leaderIp:grpcPort
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                CompletableFuture.completedFuture(null);
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testNullGrpcAddressInResponseFallsBackToDerivedAddress() 
throws Exception {
+        // RPC returns a response but grpcAddress field is null — should fall 
back
+        RaftRpcProcessor.GetMemberResponse response =
+                mock(RaftRpcProcessor.GetMemberResponse.class);
+        when(response.getGrpcAddress()).thenReturn(null);
+
+        CompletableFuture<RaftRpcProcessor.GetMemberResponse> future =
+                CompletableFuture.completedFuture(response);
+        when(mockRpcClient.getGrpcAddress(anyString())).thenReturn(future);
+
+        String result = RaftEngine.getInstance().getLeaderGrpcAddress();
+        Assert.assertEquals(LEADER_IP + ":" + GRPC_PORT, result);
+    }
+
+    @Test
+    public void testNullLeaderAfterWaitThrowsExecutionException() throws 
Exception {
+        // Leader is still null after waitingForLeader() — should throw 
ExecutionException
+        when(mockNode.getLeaderId()).thenReturn(null);
+
+        try {
+            RaftEngine.getInstance().getLeaderGrpcAddress();
+            Assert.fail("Expected ExecutionException");

Review Comment:
   `testNullLeaderAfterWaitThrowsExecutionException()` is still likely to block 
for ~1s even though `mockConfig.getRpcTimeout()` is set to 100ms, because 
`RaftEngine.waitingForLeader()` uses `wait(1000)`. To keep the unit test fast, 
consider overriding `getRpcTimeout()` to return 0 for this test case (so 
`waitingForLeader(0)` returns immediately) or adjusting `waitingForLeader()` to 
respect sub-second timeouts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to