This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new ac19e798 [ISSUE #1133] The java client adds the maxStartupAttempts
parameter to configure the retry times at startup (#1134)
ac19e798 is described below
commit ac19e79864d80bfe1c26731d0a6e6e95e3e532a8
Author: qianye <[email protected]>
AuthorDate: Wed Nov 26 17:17:42 2025 +0800
[ISSUE #1133] The java client adds the maxStartupAttempts parameter to
configure the retry times at startup (#1134)
---
.github/workflows/java_build.yml | 10 +++++-----
.../rocketmq/client/apis/ClientConfiguration.java | 8 +++++++-
.../client/apis/ClientConfigurationBuilder.java | 17 ++++++++++++++++-
.../client/java/example/ProducerSingleton.java | 3 +++
.../rocketmq/client/java/impl/ClientImpl.java | 22 +++++++++++++++++-----
5 files changed, 48 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/java_build.yml b/.github/workflows/java_build.yml
index 34822abb..31d1649e 100644
--- a/.github/workflows/java_build.yml
+++ b/.github/workflows/java_build.yml
@@ -28,11 +28,11 @@ jobs:
steps:
- name: Checkout Current Repository
uses: actions/checkout@v3
- # Use JDK 17.
- - name: Use JDK 17
+ # Use JDK 21.
+ - name: Use JDK 21
uses: actions/setup-java@v3
with:
- java-version: 17
+ java-version: 21
distribution: "adopt"
# Build the code of the current repository, skipping tests and code
style checks.
- name: Build Current Repository
@@ -66,10 +66,10 @@ jobs:
run: |
sed -i
's/org\.apache\.rocketmq:rocketmq-client-java:[^"]*/org.apache.rocketmq:rocketmq-client-java:${{
steps.get_version.outputs.version }}/'
instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/build.gradle.kts
# Use JDK 17.
- - name: Use JDK 17
+ - name: Use JDK 21
uses: actions/setup-java@v3
with:
- java-version: 17
+ java-version: 21
distribution: "adopt"
cache: gradle
# Build the rocketmq opentelemetry test.
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 042c352f..3c30e77b 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -29,18 +29,20 @@ public class ClientConfiguration {
private final Duration requestTimeout;
private final boolean sslEnabled;
private final String namespace;
+ private final int maxStartupAttempts;
/**
* The caller is supposed to have validated the arguments and handled
throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider
sessionCredentialsProvider,
- Duration requestTimeout, boolean sslEnabled, String namespace) {
+ Duration requestTimeout, boolean sslEnabled, String namespace, int
maxStartupAttempts) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
this.sslEnabled = sslEnabled;
this.namespace = namespace;
+ this.maxStartupAttempts = maxStartupAttempts;
}
public static ClientConfigurationBuilder newBuilder() {
@@ -66,4 +68,8 @@ public class ClientConfiguration {
public String getNamespace() {
return namespace;
}
+
+ public int getMaxStartupAttempts() {
+ return maxStartupAttempts;
+ }
}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index 25cc54a4..acbbe285 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.apis;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.time.Duration;
@@ -32,6 +33,7 @@ public class ClientConfigurationBuilder {
private Duration requestTimeout = Duration.ofSeconds(3);
private boolean sslEnabled = true;
private String namespace = "";
+ private int maxStartupAttempts = 3;
/**
* Configure the access point with which the SDK should communicate.
@@ -93,6 +95,18 @@ public class ClientConfigurationBuilder {
return this;
}
+ /**
+ * Configure maxStartupAttempts for client
+ *
+ * @param maxStartupAttempts max attempt times when client startup
+ * @return The {@link ClientConfigurationBuilder} instance, to allow for
method chaining.
+ */
+ public ClientConfigurationBuilder setMaxStartupAttempts(int
maxStartupAttempts) {
+ checkArgument(maxStartupAttempts > 0, "maxStartupAttempts should more
than 0");
+ this.maxStartupAttempts = maxStartupAttempts;
+ return this;
+ }
+
/**
* Finalize the build of {@link ClientConfiguration}.
*
@@ -101,6 +115,7 @@ public class ClientConfigurationBuilder {
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
- return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout, sslEnabled, namespace);
+ return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout, sslEnabled, namespace,
+ maxStartupAttempts);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
index e6700e95..d0ea25b5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
@@ -54,6 +54,9 @@ public class ProducerSingleton {
// On some Windows platforms, you may encounter SSL compatibility
issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not
essential.
// .enableSsl(false)
+ // Due to the lazy loading of gRPC, when the network conditions
are poor or the load of the application
+ // at startup is high, the first startup may fail, and you can try
multiple startups.
+ // .setMaxStartupAttempts(3)
.setCredentialProvider(sessionCredentialsProvider)
.build();
final ProducerBuilder builder = provider.newProducerBuilder()
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index f17832bf..9e9a5914 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -186,12 +186,24 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
// Fetch topic route from remote.
log.info("Begin to fetch topic(s) route data from remote during client
startup, clientId={}, topics={}",
clientId, topics);
- for (String topic : topics) {
- final ListenableFuture<TopicRouteData> future =
fetchTopicRoute(topic);
- future.get();
+ for (int attempt = 1; attempt <=
clientConfiguration.getMaxStartupAttempts(); attempt++) {
+ try {
+ for (String topic : topics) {
+ final ListenableFuture<TopicRouteData> future =
fetchTopicRoute(topic);
+ future.get();
+ }
+ log.info("Fetch topic route data from remote successfully
during startup, clientId={}, topics={}",
+ clientId, topics);
+ break;
+ } catch (Exception e) {
+ log.error("Fetch topics failed when client start, clientId={},
topics={}, attemptTime={}", clientId,
+ topics, attempt, e);
+ if (attempt == clientConfiguration.getMaxStartupAttempts()) {
+ throw new RuntimeException(
+ String.format("Failed to fetch topics after %d
attempts", attempt), e);
+ }
+ }
}
- log.info("Fetch topic route data from remote successfully during
startup, clientId={}, topics={}",
- clientId, topics);
// Update route cache periodically.
final ScheduledExecutorService scheduler =
clientManager.getScheduler();
this.updateRouteCacheFuture = scheduler.scheduleWithFixedDelay(() -> {