This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new ac19e798 [ISSUE #1133] The java client adds the maxStartupAttempts 
parameter to configure the retry times at startup (#1134)
ac19e798 is described below

commit ac19e79864d80bfe1c26731d0a6e6e95e3e532a8
Author: qianye <[email protected]>
AuthorDate: Wed Nov 26 17:17:42 2025 +0800

    [ISSUE #1133] The java client adds the maxStartupAttempts parameter to 
configure the retry times at startup (#1134)
---
 .github/workflows/java_build.yml                   | 10 +++++-----
 .../rocketmq/client/apis/ClientConfiguration.java  |  8 +++++++-
 .../client/apis/ClientConfigurationBuilder.java    | 17 ++++++++++++++++-
 .../client/java/example/ProducerSingleton.java     |  3 +++
 .../rocketmq/client/java/impl/ClientImpl.java      | 22 +++++++++++++++++-----
 5 files changed, 48 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index 34822abb..31d1649e 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -28,11 +28,11 @@ jobs:
     steps:
       - name: Checkout Current Repository
         uses: actions/checkout@v3
-      # Use JDK 17.
-      - name: Use JDK 17
+      # Use JDK 21.
+      - name: Use JDK 21
         uses: actions/setup-java@v3
         with:
-          java-version: 17
+          java-version: 21
           distribution: "adopt"
       # Build the code of the current repository, skipping tests and code 
style checks.
       - name: Build Current Repository
@@ -66,10 +66,10 @@ jobs:
         run: |
           sed -i 
's/org\.apache\.rocketmq:rocketmq-client-java:[^"]*/org.apache.rocketmq:rocketmq-client-java:${{
 steps.get_version.outputs.version }}/' 
instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/build.gradle.kts
       # Use JDK 17.
-      - name: Use JDK 17
+      - name: Use JDK 21
         uses: actions/setup-java@v3
         with:
-          java-version: 17
+          java-version: 21
           distribution: "adopt"
           cache: gradle
       # Build the rocketmq opentelemetry test.
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 042c352f..3c30e77b 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -29,18 +29,20 @@ public class ClientConfiguration {
     private final Duration requestTimeout;
     private final boolean sslEnabled;
     private final String namespace;
+    private final int maxStartupAttempts;
 
     /**
      * The caller is supposed to have validated the arguments and handled 
throwing exceptions or
      * logging warnings already, so we avoid repeating args check here.
      */
     ClientConfiguration(String endpoints, SessionCredentialsProvider 
sessionCredentialsProvider,
-        Duration requestTimeout, boolean sslEnabled, String namespace) {
+        Duration requestTimeout, boolean sslEnabled, String namespace, int 
maxStartupAttempts) {
         this.endpoints = endpoints;
         this.sessionCredentialsProvider = sessionCredentialsProvider;
         this.requestTimeout = requestTimeout;
         this.sslEnabled = sslEnabled;
         this.namespace = namespace;
+        this.maxStartupAttempts = maxStartupAttempts;
     }
 
     public static ClientConfigurationBuilder newBuilder() {
@@ -66,4 +68,8 @@ public class ClientConfiguration {
     public String getNamespace() {
         return namespace;
     }
+
+    public int getMaxStartupAttempts() {
+        return maxStartupAttempts;
+    }
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index 25cc54a4..acbbe285 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.client.apis;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.time.Duration;
@@ -32,6 +33,7 @@ public class ClientConfigurationBuilder {
     private Duration requestTimeout = Duration.ofSeconds(3);
     private boolean sslEnabled = true;
     private String namespace = "";
+    private int maxStartupAttempts = 3;
 
     /**
      * Configure the access point with which the SDK should communicate.
@@ -93,6 +95,18 @@ public class ClientConfigurationBuilder {
         return this;
     }
 
+    /**
+     * Configure maxStartupAttempts for client
+     *
+     * @param maxStartupAttempts max attempt times when client startup
+     * @return The {@link ClientConfigurationBuilder} instance, to allow for 
method chaining.
+     */
+    public ClientConfigurationBuilder setMaxStartupAttempts(int 
maxStartupAttempts) {
+        checkArgument(maxStartupAttempts > 0, "maxStartupAttempts should more 
than 0");
+        this.maxStartupAttempts = maxStartupAttempts;
+        return this;
+    }
+
     /**
      * Finalize the build of {@link ClientConfiguration}.
      *
@@ -101,6 +115,7 @@ public class ClientConfigurationBuilder {
     public ClientConfiguration build() {
         checkNotNull(endpoints, "endpoints should not be null");
         checkNotNull(requestTimeout, "requestTimeout should not be null");
-        return new ClientConfiguration(endpoints, sessionCredentialsProvider, 
requestTimeout, sslEnabled, namespace);
+        return new ClientConfiguration(endpoints, sessionCredentialsProvider, 
requestTimeout, sslEnabled, namespace,
+            maxStartupAttempts);
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
index e6700e95..d0ea25b5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
@@ -54,6 +54,9 @@ public class ProducerSingleton {
             // On some Windows platforms, you may encounter SSL compatibility 
issues. Try turning off the SSL option in
             // client configuration to solve the problem please if SSL is not 
essential.
             // .enableSsl(false)
+            // Due to the lazy loading of gRPC, when the network conditions 
are poor or the load of the application
+            // at startup is high, the first startup may fail, and you can try 
multiple startups.
+            // .setMaxStartupAttempts(3)
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         final ProducerBuilder builder = provider.newProducerBuilder()
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index f17832bf..9e9a5914 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -186,12 +186,24 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         // Fetch topic route from remote.
         log.info("Begin to fetch topic(s) route data from remote during client 
startup, clientId={}, topics={}",
             clientId, topics);
-        for (String topic : topics) {
-            final ListenableFuture<TopicRouteData> future = 
fetchTopicRoute(topic);
-            future.get();
+        for (int attempt = 1; attempt <= 
clientConfiguration.getMaxStartupAttempts(); attempt++) {
+            try {
+                for (String topic : topics) {
+                    final ListenableFuture<TopicRouteData> future = 
fetchTopicRoute(topic);
+                    future.get();
+                }
+                log.info("Fetch topic route data from remote successfully 
during startup, clientId={}, topics={}",
+                    clientId, topics);
+                break;
+            } catch (Exception e) {
+                log.error("Fetch topics failed when client start, clientId={}, 
topics={}, attemptTime={}", clientId,
+                    topics, attempt, e);
+                if (attempt == clientConfiguration.getMaxStartupAttempts()) {
+                    throw new RuntimeException(
+                        String.format("Failed to fetch topics after %d 
attempts", attempt), e);
+                }
+            }
         }
-        log.info("Fetch topic route data from remote successfully during 
startup, clientId={}, topics={}",
-            clientId, topics);
         // Update route cache periodically.
         final ScheduledExecutorService scheduler = 
clientManager.getScheduler();
         this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {

Reply via email to