This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 3d8e99f3 [Java] Support client properties for observability and
management (#1237)
3d8e99f3 is described below
commit 3d8e99f378a6c6e37bf58accd59a1f6e736a1580
Author: JYZ <[email protected]>
AuthorDate: Tue Jun 23 11:44:09 2026 +0800
[Java] Support client properties for observability and management (#1237)
---
.../rocketmq/client/apis/ClientConfiguration.java | 12 +-
.../client/apis/ClientConfigurationBuilder.java | 164 ++++++++++++++++++++-
.../java/example/AsyncSimpleConsumerExample.java | 3 +
.../java/example/LitePushConsumerExample.java | 3 +
.../client/java/example/ProducerSingleton.java | 3 +
.../client/java/example/PushConsumerExample.java | 3 +
.../client/java/example/SimpleConsumerExample.java | 3 +
.../apache/rocketmq/client/java/impl/Settings.java | 16 +-
.../impl/consumer/PushSubscriptionSettings.java | 7 +-
.../impl/consumer/SimpleSubscriptionSettings.java | 7 +-
.../client/java/impl/producer/ProducerImpl.java | 2 +-
.../java/impl/producer/PublishingSettings.java | 10 +-
.../consumer/PushSubscriptionSettingsTest.java | 4 +
.../consumer/SimpleSubscriptionSettingsTest.java | 4 +
.../java/impl/producer/PublishingSettingsTest.java | 53 +++++++
.../apache/rocketmq/client/java/tool/TestBase.java | 3 +-
16 files changed, 283 insertions(+), 14 deletions(-)
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 3c30e77b..fa8c45ea 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -18,6 +18,9 @@
package org.apache.rocketmq.client.apis;
import java.time.Duration;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Optional;
/**
@@ -30,19 +33,22 @@ public class ClientConfiguration {
private final boolean sslEnabled;
private final String namespace;
private final int maxStartupAttempts;
+ private final Map<String, String> clientProperties;
/**
* The caller is supposed to have validated the arguments and handled
throwing exceptions or
* logging warnings already, so we avoid repeating args check here.
*/
ClientConfiguration(String endpoints, SessionCredentialsProvider
sessionCredentialsProvider,
- Duration requestTimeout, boolean sslEnabled, String namespace, int
maxStartupAttempts) {
+ Duration requestTimeout, boolean sslEnabled, String namespace, int
maxStartupAttempts,
+ Map<String, String> clientProperties) {
this.endpoints = endpoints;
this.sessionCredentialsProvider = sessionCredentialsProvider;
this.requestTimeout = requestTimeout;
this.sslEnabled = sslEnabled;
this.namespace = namespace;
this.maxStartupAttempts = maxStartupAttempts;
+ this.clientProperties = Collections.unmodifiableMap(new
LinkedHashMap<>(clientProperties));
}
public static ClientConfigurationBuilder newBuilder() {
@@ -72,4 +78,8 @@ public class ClientConfiguration {
public int getMaxStartupAttempts() {
return maxStartupAttempts;
}
+
+ public Map<String, String> getClientProperties() {
+ return clientProperties;
+ }
}
diff --git
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index acbbe285..4688e500 100644
---
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -20,7 +20,11 @@ package org.apache.rocketmq.client.apis;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
@@ -28,12 +32,20 @@ import
org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
* Builder to set {@link ClientConfiguration}.
*/
public class ClientConfigurationBuilder {
+ private static final int MAX_CLIENT_PROPERTIES_ENTRIES = 32;
+ private static final int MAX_CLIENT_PROPERTY_KEY_LENGTH = 64;
+ private static final int MAX_CLIENT_PROPERTY_VALUE_LENGTH = 256;
+ private static final int MAX_CLIENT_PROPERTIES_TOTAL_SIZE_BYTES = 4 * 1024;
+ private static final String RESERVED_CLIENT_PROPERTY_PREFIX = "rocketmq.";
+ private static final Pattern CLIENT_PROPERTY_KEY_PATTERN =
Pattern.compile("[a-zA-Z][a-zA-Z0-9_.-]*");
+
private String endpoints;
private SessionCredentialsProvider sessionCredentialsProvider = null;
private Duration requestTimeout = Duration.ofSeconds(3);
private boolean sslEnabled = true;
private String namespace = "";
private int maxStartupAttempts = 3;
+ private final Map<String, String> clientProperties = new LinkedHashMap<>();
/**
* Configure the access point with which the SDK should communicate.
@@ -107,6 +119,60 @@ public class ClientConfigurationBuilder {
return this;
}
+ /**
+ * Add a client instance property reported to server-side client runtime.
+ *
+ * @param key property key.
+ * @param value property value.
+ * @return the client configuration builder instance.
+ */
+ public ClientConfigurationBuilder addClientProperty(String key, String
value) {
+ validateClientPropertyEntry(key, value);
+ Map<String, String> candidate = new LinkedHashMap<>(clientProperties);
+ candidate.put(key, value);
+ validateClientPropertiesLimits(candidate);
+ this.clientProperties.clear();
+ this.clientProperties.putAll(candidate);
+ return this;
+ }
+
+ /**
+ * Set client instance properties reported to server-side client runtime,
replacing existing properties.
+ *
+ * @param properties client properties.
+ * @return the client configuration builder instance.
+ */
+ public ClientConfigurationBuilder setClientProperties(Map<String, String>
properties) {
+ checkNotNull(properties, "clientProperties should not be null");
+ Map<String, String> candidate = new LinkedHashMap<>(properties);
+ validateClientProperties(candidate);
+ this.clientProperties.clear();
+ this.clientProperties.putAll(candidate);
+ return this;
+ }
+
+ /**
+ * Remove a client instance property.
+ *
+ * @param key property key.
+ * @return the client configuration builder instance.
+ */
+ public ClientConfigurationBuilder removeClientProperty(String key) {
+ checkNotNull(key, "client property key should not be null");
+ this.clientProperties.remove(key);
+ return this;
+ }
+
+ /**
+ * Clear all client instance properties.
+ *
+ * @return the client configuration builder instance.
+ */
+ public ClientConfigurationBuilder clearClientProperties() {
+ this.clientProperties.clear();
+ return this;
+ }
+
/**
* Finalize the build of {@link ClientConfiguration}.
*
@@ -115,7 +181,103 @@ public class ClientConfigurationBuilder {
public ClientConfiguration build() {
checkNotNull(endpoints, "endpoints should not be null");
checkNotNull(requestTimeout, "requestTimeout should not be null");
+ // Keep build() defensive for maps supplied through
setClientProperties or future builder paths.
+ validateClientProperties(clientProperties);
return new ClientConfiguration(endpoints, sessionCredentialsProvider,
requestTimeout, sslEnabled, namespace,
- maxStartupAttempts);
+ maxStartupAttempts, clientProperties);
+ }
+
+ private static void validateClientProperties(Map<String, String>
properties) {
+ validateClientProperties(properties, true);
+ }
+
+ private static void validateClientPropertiesLimits(Map<String, String>
properties) {
+ validateClientProperties(properties, false);
+ }
+
+ /**
+ * Performs a client-side pre-check for client properties before the
settings are reported to the server.
+ *
+ * <p>This validation mirrors the public proto constraints as closely as
possible without depending on generated
+ * proto classes in the client-apis module. The server still has final
authority and may reject the properties with
+ * its own validation result.
+ */
+ private static void validateClientProperties(Map<String, String>
properties, boolean validateEntries) {
+ checkArgument(properties.size() <= MAX_CLIENT_PROPERTIES_ENTRIES,
+ "clientProperties should not contain more than %s entries",
MAX_CLIENT_PROPERTIES_ENTRIES);
+ int totalSize = 0;
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (validateEntries) {
+ validateClientPropertyEntry(key, value);
+ }
+ totalSize += computeClientPropertySize(key, value);
+ }
+ checkArgument(totalSize <= MAX_CLIENT_PROPERTIES_TOTAL_SIZE_BYTES,
+ "clientProperties total size should not exceed %s bytes",
MAX_CLIENT_PROPERTIES_TOTAL_SIZE_BYTES);
+ }
+
+ private static void validateClientPropertyEntry(String key, String value) {
+ checkNotNull(key, "client property key should not be null");
+ checkNotNull(value, "client property value should not be null");
+ checkArgument(!key.isEmpty(), "client property key should not be
empty");
+ checkArgument(key.length() <= MAX_CLIENT_PROPERTY_KEY_LENGTH,
+ "client property key length should not exceed %s characters",
MAX_CLIENT_PROPERTY_KEY_LENGTH);
+ checkArgument(CLIENT_PROPERTY_KEY_PATTERN.matcher(key).matches(),
+ "client property key should start with a letter and only contain
letters, digits, dot, underscore or "
+ + "hyphen");
+ checkArgument(!key.startsWith(RESERVED_CLIENT_PROPERTY_PREFIX),
+ "client property key should not use reserved prefix %s",
RESERVED_CLIENT_PROPERTY_PREFIX);
+ checkArgument(value.length() <= MAX_CLIENT_PROPERTY_VALUE_LENGTH,
+ "client property value length should not exceed %s characters",
MAX_CLIENT_PROPERTY_VALUE_LENGTH);
+ }
+
+ /**
+ * Computes the serialized size of one {@code client_properties} map entry
in protobuf wire format.
+ *
+ * <p>A {@code map<string, string>} field is encoded as repeated entry
messages. Each entry is written as Settings
+ * field 9, containing key field 1 and value field 2. Client property keys
are constrained to ASCII, so
+ * {@link String#length()} equals the UTF-8 byte size for keys; values may
contain non-ASCII characters and must use
+ * their UTF-8 byte size.
+ */
+ private static int computeClientPropertySize(String key, String value) {
+ int keySize = key.length();
+ int valueSize = value.getBytes(StandardCharsets.UTF_8).length;
+ int entrySize = computeTagSize(1)
+ + computeUInt32SizeNoTag(keySize)
+ + keySize
+ + computeTagSize(2)
+ + computeUInt32SizeNoTag(valueSize)
+ + valueSize;
+ return computeTagSize(9)
+ + computeUInt32SizeNoTag(entrySize)
+ + entrySize;
+ }
+
+ /**
+ * Computes the protobuf wire-format size of a field tag for the given
field number.
+ */
+ private static int computeTagSize(int fieldNumber) {
+ return computeUInt32SizeNoTag(fieldNumber << 3);
+ }
+
+ /**
+ * Computes the protobuf unsigned int32 varint size.
+ */
+ private static int computeUInt32SizeNoTag(int value) {
+ if ((value & (~0 << 7)) == 0) {
+ return 1;
+ }
+ if ((value & (~0 << 14)) == 0) {
+ return 2;
+ }
+ if ((value & (~0 << 21)) == 0) {
+ return 3;
+ }
+ if ((value & (~0 << 28)) == 0) {
+ return 4;
+ }
+ return 5;
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 63b0fba7..5843359e 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -60,6 +60,9 @@ public class AsyncSimpleConsumerExample {
// On some Windows platforms, you may encounter SSL compatibility
issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not
essential.
// .enableSsl(false)
+ // Set optional opaque client properties for server-side
observability.
+ // .addClientProperty("key1", "value1")
+ // .addClientProperty("key2", "value2")
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "yourConsumerGroup";
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
index ff990d4c..6b0e4dfc 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
@@ -50,6 +50,9 @@ public class LitePushConsumerExample {
// On some Windows platforms, you may encounter SSL compatibility
issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not
essential.
// .enableSsl(false)
+ // Set optional opaque client properties for server-side
observability.
+ // .addClientProperty("key1", "value1")
+ // .addClientProperty("key2", "value2")
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "yourConsumerGroup";
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
index d0ea25b5..af21acba 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
@@ -57,6 +57,9 @@ public class ProducerSingleton {
// Due to the lazy loading of gRPC, when the network conditions
are poor or the load of the application
// at startup is high, the first startup may fail, and you can try
multiple startups.
// .setMaxStartupAttempts(3)
+ // Set optional opaque client properties for server-side
observability.
+ // .addClientProperty("key1", "value1")
+ // .addClientProperty("key2", "value2")
.setCredentialProvider(sessionCredentialsProvider)
.build();
final ProducerBuilder builder = provider.newProducerBuilder()
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
index 008fdc7e..3bf69378 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
@@ -52,6 +52,9 @@ public class PushConsumerExample {
// On some Windows platforms, you may encounter SSL compatibility
issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not
essential.
// .enableSsl(false)
+ // Set optional opaque client properties for server-side
observability.
+ // .addClientProperty("key1", "value1")
+ // .addClientProperty("key2", "value2")
.setCredentialProvider(sessionCredentialsProvider)
.build();
String tag = "yourMessageTagA";
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 8139749c..04f4650a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -55,6 +55,9 @@ public class SimpleConsumerExample {
// On some Windows platforms, you may encounter SSL compatibility
issues. Try turning off the SSL option in
// client configuration to solve the problem please if SSL is not
essential.
// .enableSsl(false)
+ // Set optional opaque client properties for server-side
observability.
+ // .addClientProperty("key1", "value1")
+ // .addClientProperty("key2", "value2")
.setCredentialProvider(sessionCredentialsProvider)
.build();
String consumerGroup = "yourConsumerGroup";
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index 88b335c8..95e8e519 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -19,6 +19,9 @@ package org.apache.rocketmq.client.java.impl;
import com.google.common.base.MoreObjects;
import java.time.Duration;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
@@ -31,20 +34,22 @@ public abstract class Settings {
protected final Endpoints accessPoint;
protected volatile RetryPolicy retryPolicy;
protected final Duration requestTimeout;
+ protected final Map<String, String> clientProperties;
public Settings(String namespace, ClientId clientId, ClientType
clientType, Endpoints accessPoint,
- RetryPolicy retryPolicy, Duration requestTimeout) {
+ RetryPolicy retryPolicy, Duration requestTimeout, Map<String, String>
clientProperties) {
this.namespace = namespace;
this.clientId = clientId;
this.clientType = clientType;
this.accessPoint = accessPoint;
this.retryPolicy = retryPolicy;
this.requestTimeout = requestTimeout;
+ this.clientProperties = Collections.unmodifiableMap(new
LinkedHashMap<>(clientProperties));
}
public Settings(String namespace, ClientId clientId, ClientType
clientType, Endpoints accessPoint,
- Duration requestTimeout) {
- this(namespace, clientId, clientType, accessPoint, null,
requestTimeout);
+ Duration requestTimeout, Map<String, String> clientProperties) {
+ this(namespace, clientId, clientType, accessPoint, null,
requestTimeout, clientProperties);
}
public abstract apache.rocketmq.v2.Settings toProtobuf();
@@ -55,6 +60,10 @@ public abstract class Settings {
return retryPolicy;
}
+ public Map<String, String> getClientProperties() {
+ return clientProperties;
+ }
+
@ExcludeFromJacocoGeneratedReport
@Override
public String toString() {
@@ -64,6 +73,7 @@ public abstract class Settings {
.add("accessPoint", accessPoint)
.add("retryPolicy", retryPolicy)
.add("requestTimeout", requestTimeout)
+ .add("clientProperties", clientProperties)
.toString();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 624490d4..24e1c9c2 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -59,7 +59,8 @@ public class PushSubscriptionSettings extends Settings {
String group,
Map<String, FilterExpression> subscriptionExpression
) {
- super(configuration.getNamespace(), clientId, clientType, endpoints,
configuration.getRequestTimeout());
+ super(configuration.getNamespace(), clientId, clientType, endpoints,
configuration.getRequestTimeout(),
+ configuration.getClientProperties());
this.group = new Resource(configuration.getNamespace(), group);
this.subscriptionExpressions = subscriptionExpression;
}
@@ -107,7 +108,8 @@ public class PushSubscriptionSettings extends Settings {
Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
return
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
-
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf())
+ .putAllClientProperties(clientProperties).build();
}
@Override
@@ -144,6 +146,7 @@ public class PushSubscriptionSettings extends Settings {
.add("accessPoint", accessPoint)
.add("retryPolicy", retryPolicy)
.add("requestTimeout", requestTimeout)
+ .add("clientProperties", clientProperties)
.add("group", group)
.add("subscriptionExpressions", subscriptionExpressions)
.add("fifo", fifo)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 1c9e728d..ebb12fb1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -55,7 +55,8 @@ public class SimpleSubscriptionSettings extends Settings {
Duration longPollingTimeout,
Map<String, FilterExpression> subscriptionExpression
) {
- super(configuration.getNamespace(), clientId, clientType, endpoints,
configuration.getRequestTimeout());
+ super(configuration.getNamespace(), clientId, clientType, endpoints,
configuration.getRequestTimeout(),
+ configuration.getClientProperties());
this.group = new Resource(configuration.getNamespace(), group);
this.longPollingTimeout = longPollingTimeout;
this.subscriptionExpressions = subscriptionExpression;
@@ -92,7 +93,8 @@ public class SimpleSubscriptionSettings extends Settings {
.addAllSubscriptions(subscriptionEntries).build();
return
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
-
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf())
+ .putAllClientProperties(clientProperties).build();
}
@Override
@@ -113,6 +115,7 @@ public class SimpleSubscriptionSettings extends Settings {
.add("accessPoint", accessPoint)
.add("retryPolicy", retryPolicy)
.add("requestTimeout", requestTimeout)
+ .add("clientProperties", clientProperties)
.add("group", group)
.add("longPollingTimeout", longPollingTimeout)
.add("subscriptionExpressions", subscriptionExpressions)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 7c0ddad1..19cf0ee7 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -115,7 +115,7 @@ class ProducerImpl extends ClientImpl implements Producer {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy =
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
this.publishingSettings = new
PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
- retryPolicy, clientConfiguration.getRequestTimeout(), topics);
+ retryPolicy, clientConfiguration.getRequestTimeout(), topics,
clientConfiguration.getClientProperties());
this.checker = checker;
this.publishingRouteDataCache = new ConcurrentHashMap<>();
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index 29159caa..f728c534 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -22,6 +22,7 @@ import apache.rocketmq.v2.Resource;
import com.google.common.base.MoreObjects;
import com.google.protobuf.util.Durations;
import java.time.Duration;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.java.impl.ClientType;
@@ -46,8 +47,9 @@ public class PublishingSettings extends Settings {
private volatile boolean validateMessageType = true;
public PublishingSettings(String namespace, ClientId clientId, Endpoints
accessPoint,
- ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout,
Set<String> topics) {
- super(namespace, clientId, ClientType.PRODUCER, accessPoint,
retryPolicy, requestTimeout);
+ ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout,
Set<String> topics,
+ Map<String, String> clientProperties) {
+ super(namespace, clientId, ClientType.PRODUCER, accessPoint,
retryPolicy, requestTimeout, clientProperties);
this.topics = topics;
}
@@ -71,7 +73,8 @@ public class PublishingSettings extends Settings {
.build();
final apache.rocketmq.v2.Settings.Builder builder =
apache.rocketmq.v2.Settings.newBuilder()
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
-
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
+
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing)
+ .putAllClientProperties(clientProperties);
return
builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
}
@@ -100,6 +103,7 @@ public class PublishingSettings extends Settings {
.add("accessPoint", accessPoint)
.add("retryPolicy", retryPolicy)
.add("requestTimeout", requestTimeout)
+ .add("clientProperties", clientProperties)
.add("topics", topics)
.add("maxBodySizeBytes", maxBodySizeBytes)
.toString();
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index 3ad28f95..631bcdf1 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -46,6 +46,8 @@ public class PushSubscriptionSettingsTest extends TestBase {
.setNamespace(FAKE_NAMESPACE)
.setRequestTimeout(requestTimeout)
.setEndpoints(FAKE_ENDPOINTS)
+ .addClientProperty("key1", "value1")
+ .addClientProperty("key2", "value2")
.build();
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
@@ -57,6 +59,8 @@ public class PushSubscriptionSettingsTest extends TestBase {
final Settings settings = pushSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.PUSH_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
+ Assert.assertEquals("value1",
settings.getClientPropertiesMap().get("key1"));
+ Assert.assertEquals("value2",
settings.getClientPropertiesMap().get("key2"));
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
index 5bbdd8ee..2cc3bdec 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -44,6 +44,8 @@ public class SimpleSubscriptionSettingsTest extends TestBase {
.setNamespace(FAKE_NAMESPACE)
.setRequestTimeout(requestTimeout)
.setEndpoints(FAKE_ENDPOINTS)
+ .addClientProperty("key1", "value1")
+ .addClientProperty("key2", "value2")
.build();
ClientId clientId = new ClientId();
Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
@@ -57,6 +59,8 @@ public class SimpleSubscriptionSettingsTest extends TestBase {
final Settings settings = simpleSubscriptionSettings.toProtobuf();
Assert.assertEquals(settings.getClientType(),
ClientType.SIMPLE_CONSUMER);
Assert.assertEquals(settings.getRequestTimeout(),
Durations.fromNanos(requestTimeout.toNanos()));
+ Assert.assertEquals("value1",
settings.getClientPropertiesMap().get("key1"));
+ Assert.assertEquals("value2",
settings.getClientPropertiesMap().get("key2"));
Assert.assertTrue(settings.hasSubscription());
final Subscription subscription = settings.getSubscription();
Assert.assertEquals(subscription.getGroup(),
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettingsTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettingsTest.java
new file mode 100644
index 00000000..226cc409
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettingsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.producer;
+
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.Settings;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PublishingSettingsTest extends TestBase {
+
+ @Test
+ public void testToProtobufWithClientProperties() {
+ final Duration requestTimeout = Duration.ofSeconds(3);
+ Map<String, String> clientProperties = new HashMap<>();
+ clientProperties.put("key1", "value1");
+ clientProperties.put("key2", "value2");
+ final PublishingSettings publishingSettings = new PublishingSettings(
+ FAKE_NAMESPACE, new ClientId(), fakeEndpoints(),
fakeExponentialBackoffRetryPolicy(),
+ requestTimeout, new
HashSet<>(Collections.singleton(FAKE_TOPIC_0)), clientProperties);
+
+ final Settings settings = publishingSettings.toProtobuf();
+
+ Assert.assertEquals(ClientType.PRODUCER, settings.getClientType());
+ Assert.assertEquals(Durations.fromNanos(requestTimeout.toNanos()),
settings.getRequestTimeout());
+ Assert.assertEquals("value1",
settings.getClientPropertiesMap().get("key1"));
+ Assert.assertEquals("value2",
settings.getClientPropertiesMap().get("key2"));
+ Assert.assertTrue(settings.hasPublishing());
+ }
+}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index ee316f2b..42c7c3a2 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -52,6 +52,7 @@ import io.grpc.Metadata;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -393,7 +394,7 @@ public class TestBase {
protected PublishingSettings fakeProducerSettings() {
return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID,
fakeEndpoints(),
- fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new
HashSet<>());
+ fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new
HashSet<>(), Collections.emptyMap());
}
protected SendReceiptImpl fakeSendReceiptImpl(