This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d8e99f3 [Java] Support client properties for observability and 
management (#1237)
3d8e99f3 is described below

commit 3d8e99f378a6c6e37bf58accd59a1f6e736a1580
Author: JYZ <[email protected]>
AuthorDate: Tue Jun 23 11:44:09 2026 +0800

    [Java] Support client properties for observability and management (#1237)
---
 .../rocketmq/client/apis/ClientConfiguration.java  |  12 +-
 .../client/apis/ClientConfigurationBuilder.java    | 164 ++++++++++++++++++++-
 .../java/example/AsyncSimpleConsumerExample.java   |   3 +
 .../java/example/LitePushConsumerExample.java      |   3 +
 .../client/java/example/ProducerSingleton.java     |   3 +
 .../client/java/example/PushConsumerExample.java   |   3 +
 .../client/java/example/SimpleConsumerExample.java |   3 +
 .../apache/rocketmq/client/java/impl/Settings.java |  16 +-
 .../impl/consumer/PushSubscriptionSettings.java    |   7 +-
 .../impl/consumer/SimpleSubscriptionSettings.java  |   7 +-
 .../client/java/impl/producer/ProducerImpl.java    |   2 +-
 .../java/impl/producer/PublishingSettings.java     |  10 +-
 .../consumer/PushSubscriptionSettingsTest.java     |   4 +
 .../consumer/SimpleSubscriptionSettingsTest.java   |   4 +
 .../java/impl/producer/PublishingSettingsTest.java |  53 +++++++
 .../apache/rocketmq/client/java/tool/TestBase.java |   3 +-
 16 files changed, 283 insertions(+), 14 deletions(-)

diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
index 3c30e77b..fa8c45ea 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfiguration.java
@@ -18,6 +18,9 @@
 package org.apache.rocketmq.client.apis;
 
 import java.time.Duration;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -30,19 +33,22 @@ public class ClientConfiguration {
     private final boolean sslEnabled;
     private final String namespace;
     private final int maxStartupAttempts;
+    private final Map<String, String> clientProperties;
 
     /**
      * The caller is supposed to have validated the arguments and handled 
throwing exceptions or
      * logging warnings already, so we avoid repeating args check here.
      */
     ClientConfiguration(String endpoints, SessionCredentialsProvider 
sessionCredentialsProvider,
-        Duration requestTimeout, boolean sslEnabled, String namespace, int 
maxStartupAttempts) {
+        Duration requestTimeout, boolean sslEnabled, String namespace, int 
maxStartupAttempts,
+        Map<String, String> clientProperties) {
         this.endpoints = endpoints;
         this.sessionCredentialsProvider = sessionCredentialsProvider;
         this.requestTimeout = requestTimeout;
         this.sslEnabled = sslEnabled;
         this.namespace = namespace;
         this.maxStartupAttempts = maxStartupAttempts;
+        this.clientProperties = Collections.unmodifiableMap(new 
LinkedHashMap<>(clientProperties));
     }
 
     public static ClientConfigurationBuilder newBuilder() {
@@ -72,4 +78,8 @@ public class ClientConfiguration {
     public int getMaxStartupAttempts() {
         return maxStartupAttempts;
     }
+
+    public Map<String, String> getClientProperties() {
+        return clientProperties;
+    }
 }
diff --git 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
index acbbe285..4688e500 100644
--- 
a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
+++ 
b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientConfigurationBuilder.java
@@ -20,7 +20,11 @@ package org.apache.rocketmq.client.apis;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
 
@@ -28,12 +32,20 @@ import 
org.apache.rocketmq.client.apis.consumer.SimpleConsumerBuilder;
  * Builder to set {@link ClientConfiguration}.
  */
 public class ClientConfigurationBuilder {
+    private static final int MAX_CLIENT_PROPERTIES_ENTRIES = 32;
+    private static final int MAX_CLIENT_PROPERTY_KEY_LENGTH = 64;
+    private static final int MAX_CLIENT_PROPERTY_VALUE_LENGTH = 256;
+    private static final int MAX_CLIENT_PROPERTIES_TOTAL_SIZE_BYTES = 4 * 1024;
+    private static final String RESERVED_CLIENT_PROPERTY_PREFIX = "rocketmq.";
+    private static final Pattern CLIENT_PROPERTY_KEY_PATTERN = 
Pattern.compile("[a-zA-Z][a-zA-Z0-9_.-]*");
+
     private String endpoints;
     private SessionCredentialsProvider sessionCredentialsProvider = null;
     private Duration requestTimeout = Duration.ofSeconds(3);
     private boolean sslEnabled = true;
     private String namespace = "";
     private int maxStartupAttempts = 3;
+    private final Map<String, String> clientProperties = new LinkedHashMap<>();
 
     /**
      * Configure the access point with which the SDK should communicate.
@@ -107,6 +119,60 @@ public class ClientConfigurationBuilder {
         return this;
     }
 
+    /**
+     * Add a client instance property reported to server-side client runtime.
+     *
+     * @param key property key.
+     * @param value property value.
+     * @return the client configuration builder instance.
+     */
+    public ClientConfigurationBuilder addClientProperty(String key, String 
value) {
+        validateClientPropertyEntry(key, value);
+        Map<String, String> candidate = new LinkedHashMap<>(clientProperties);
+        candidate.put(key, value);
+        validateClientPropertiesLimits(candidate);
+        this.clientProperties.clear();
+        this.clientProperties.putAll(candidate);
+        return this;
+    }
+
+    /**
+     * Set client instance properties reported to server-side client runtime, 
replacing existing properties.
+     *
+     * @param properties client properties.
+     * @return the client configuration builder instance.
+     */
+    public ClientConfigurationBuilder setClientProperties(Map<String, String> 
properties) {
+        checkNotNull(properties, "clientProperties should not be null");
+        Map<String, String> candidate = new LinkedHashMap<>(properties);
+        validateClientProperties(candidate);
+        this.clientProperties.clear();
+        this.clientProperties.putAll(candidate);
+        return this;
+    }
+
+    /**
+     * Remove a client instance property.
+     *
+     * @param key property key.
+     * @return the client configuration builder instance.
+     */
+    public ClientConfigurationBuilder removeClientProperty(String key) {
+        checkNotNull(key, "client property key should not be null");
+        this.clientProperties.remove(key);
+        return this;
+    }
+
+    /**
+     * Clear all client instance properties.
+     *
+     * @return the client configuration builder instance.
+     */
+    public ClientConfigurationBuilder clearClientProperties() {
+        this.clientProperties.clear();
+        return this;
+    }
+
     /**
      * Finalize the build of {@link ClientConfiguration}.
      *
@@ -115,7 +181,103 @@ public class ClientConfigurationBuilder {
     public ClientConfiguration build() {
         checkNotNull(endpoints, "endpoints should not be null");
         checkNotNull(requestTimeout, "requestTimeout should not be null");
+        // Keep build() defensive for maps supplied through 
setClientProperties or future builder paths.
+        validateClientProperties(clientProperties);
         return new ClientConfiguration(endpoints, sessionCredentialsProvider, 
requestTimeout, sslEnabled, namespace,
-            maxStartupAttempts);
+            maxStartupAttempts, clientProperties);
+    }
+
+    private static void validateClientProperties(Map<String, String> 
properties) {
+        validateClientProperties(properties, true);
+    }
+
+    private static void validateClientPropertiesLimits(Map<String, String> 
properties) {
+        validateClientProperties(properties, false);
+    }
+
+    /**
+     * Performs a client-side pre-check for client properties before the 
settings are reported to the server.
+     *
+     * <p>This validation mirrors the public proto constraints as closely as 
possible without depending on generated
+     * proto classes in the client-apis module. The server still has final 
authority and may reject the properties with
+     * its own validation result.
+     */
+    private static void validateClientProperties(Map<String, String> 
properties, boolean validateEntries) {
+        checkArgument(properties.size() <= MAX_CLIENT_PROPERTIES_ENTRIES,
+            "clientProperties should not contain more than %s entries", 
MAX_CLIENT_PROPERTIES_ENTRIES);
+        int totalSize = 0;
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            if (validateEntries) {
+                validateClientPropertyEntry(key, value);
+            }
+            totalSize += computeClientPropertySize(key, value);
+        }
+        checkArgument(totalSize <= MAX_CLIENT_PROPERTIES_TOTAL_SIZE_BYTES,
+            "clientProperties total size should not exceed %s bytes", 
MAX_CLIENT_PROPERTIES_TOTAL_SIZE_BYTES);
+    }
+
+    private static void validateClientPropertyEntry(String key, String value) {
+        checkNotNull(key, "client property key should not be null");
+        checkNotNull(value, "client property value should not be null");
+        checkArgument(!key.isEmpty(), "client property key should not be 
empty");
+        checkArgument(key.length() <= MAX_CLIENT_PROPERTY_KEY_LENGTH,
+            "client property key length should not exceed %s characters", 
MAX_CLIENT_PROPERTY_KEY_LENGTH);
+        checkArgument(CLIENT_PROPERTY_KEY_PATTERN.matcher(key).matches(),
+            "client property key should start with a letter and only contain 
letters, digits, dot, underscore or "
+                + "hyphen");
+        checkArgument(!key.startsWith(RESERVED_CLIENT_PROPERTY_PREFIX),
+            "client property key should not use reserved prefix %s", 
RESERVED_CLIENT_PROPERTY_PREFIX);
+        checkArgument(value.length() <= MAX_CLIENT_PROPERTY_VALUE_LENGTH,
+            "client property value length should not exceed %s characters", 
MAX_CLIENT_PROPERTY_VALUE_LENGTH);
+    }
+
+    /**
+     * Computes the serialized size of one {@code client_properties} map entry 
in protobuf wire format.
+     *
+     * <p>A {@code map<string, string>} field is encoded as repeated entry 
messages. Each entry is written as Settings
+     * field 9, containing key field 1 and value field 2. Client property keys 
are constrained to ASCII, so
+     * {@link String#length()} equals the UTF-8 byte size for keys; values may 
contain non-ASCII characters and must use
+     * their UTF-8 byte size.
+     */
+    private static int computeClientPropertySize(String key, String value) {
+        int keySize = key.length();
+        int valueSize = value.getBytes(StandardCharsets.UTF_8).length;
+        int entrySize = computeTagSize(1)
+            + computeUInt32SizeNoTag(keySize)
+            + keySize
+            + computeTagSize(2)
+            + computeUInt32SizeNoTag(valueSize)
+            + valueSize;
+        return computeTagSize(9)
+            + computeUInt32SizeNoTag(entrySize)
+            + entrySize;
+    }
+
+    /**
+     * Computes the protobuf wire-format size of a field tag for the given 
field number.
+     */
+    private static int computeTagSize(int fieldNumber) {
+        return computeUInt32SizeNoTag(fieldNumber << 3);
+    }
+
+    /**
+     * Computes the protobuf unsigned int32 varint size.
+     */
+    private static int computeUInt32SizeNoTag(int value) {
+        if ((value & (~0 << 7)) == 0) {
+            return 1;
+        }
+        if ((value & (~0 << 14)) == 0) {
+            return 2;
+        }
+        if ((value & (~0 << 21)) == 0) {
+            return 3;
+        }
+        if ((value & (~0 << 28)) == 0) {
+            return 4;
+        }
+        return 5;
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
index 63b0fba7..5843359e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/AsyncSimpleConsumerExample.java
@@ -60,6 +60,9 @@ public class AsyncSimpleConsumerExample {
             // On some Windows platforms, you may encounter SSL compatibility 
issues. Try turning off the SSL option in
             // client configuration to solve the problem please if SSL is not 
essential.
             // .enableSsl(false)
+            // Set optional opaque client properties for server-side 
observability.
+            // .addClientProperty("key1", "value1")
+            // .addClientProperty("key2", "value2")
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         String consumerGroup = "yourConsumerGroup";
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
index ff990d4c..6b0e4dfc 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/LitePushConsumerExample.java
@@ -50,6 +50,9 @@ public class LitePushConsumerExample {
             // On some Windows platforms, you may encounter SSL compatibility 
issues. Try turning off the SSL option in
             // client configuration to solve the problem please if SSL is not 
essential.
             // .enableSsl(false)
+            // Set optional opaque client properties for server-side 
observability.
+            // .addClientProperty("key1", "value1")
+            // .addClientProperty("key2", "value2")
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         String consumerGroup = "yourConsumerGroup";
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
index d0ea25b5..af21acba 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/ProducerSingleton.java
@@ -57,6 +57,9 @@ public class ProducerSingleton {
             // Due to the lazy loading of gRPC, when the network conditions 
are poor or the load of the application
             // at startup is high, the first startup may fail, and you can try 
multiple startups.
             // .setMaxStartupAttempts(3)
+            // Set optional opaque client properties for server-side 
observability.
+            // .addClientProperty("key1", "value1")
+            // .addClientProperty("key2", "value2")
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         final ProducerBuilder builder = provider.newProducerBuilder()
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
index 008fdc7e..3bf69378 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/PushConsumerExample.java
@@ -52,6 +52,9 @@ public class PushConsumerExample {
             // On some Windows platforms, you may encounter SSL compatibility 
issues. Try turning off the SSL option in
             // client configuration to solve the problem please if SSL is not 
essential.
             // .enableSsl(false)
+            // Set optional opaque client properties for server-side 
observability.
+            // .addClientProperty("key1", "value1")
+            // .addClientProperty("key2", "value2")
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         String tag = "yourMessageTagA";
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
index 8139749c..04f4650a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/example/SimpleConsumerExample.java
@@ -55,6 +55,9 @@ public class SimpleConsumerExample {
             // On some Windows platforms, you may encounter SSL compatibility 
issues. Try turning off the SSL option in
             // client configuration to solve the problem please if SSL is not 
essential.
             // .enableSsl(false)
+            // Set optional opaque client properties for server-side 
observability.
+            // .addClientProperty("key1", "value1")
+            // .addClientProperty("key2", "value2")
             .setCredentialProvider(sessionCredentialsProvider)
             .build();
         String consumerGroup = "yourConsumerGroup";
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
index 88b335c8..95e8e519 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Settings.java
@@ -19,6 +19,9 @@ package org.apache.rocketmq.client.java.impl;
 
 import com.google.common.base.MoreObjects;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import org.apache.rocketmq.client.java.misc.ClientId;
 import org.apache.rocketmq.client.java.misc.ExcludeFromJacocoGeneratedReport;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
@@ -31,20 +34,22 @@ public abstract class Settings {
     protected final Endpoints accessPoint;
     protected volatile RetryPolicy retryPolicy;
     protected final Duration requestTimeout;
+    protected final Map<String, String> clientProperties;
 
     public Settings(String namespace, ClientId clientId, ClientType 
clientType, Endpoints accessPoint,
-        RetryPolicy retryPolicy, Duration requestTimeout) {
+        RetryPolicy retryPolicy, Duration requestTimeout, Map<String, String> 
clientProperties) {
         this.namespace = namespace;
         this.clientId = clientId;
         this.clientType = clientType;
         this.accessPoint = accessPoint;
         this.retryPolicy = retryPolicy;
         this.requestTimeout = requestTimeout;
+        this.clientProperties = Collections.unmodifiableMap(new 
LinkedHashMap<>(clientProperties));
     }
 
     public Settings(String namespace, ClientId clientId, ClientType 
clientType, Endpoints accessPoint,
-        Duration requestTimeout) {
-        this(namespace, clientId, clientType, accessPoint, null, 
requestTimeout);
+        Duration requestTimeout, Map<String, String> clientProperties) {
+        this(namespace, clientId, clientType, accessPoint, null, 
requestTimeout, clientProperties);
     }
 
     public abstract apache.rocketmq.v2.Settings toProtobuf();
@@ -55,6 +60,10 @@ public abstract class Settings {
         return retryPolicy;
     }
 
+    public Map<String, String> getClientProperties() {
+        return clientProperties;
+    }
+
     @ExcludeFromJacocoGeneratedReport
     @Override
     public String toString() {
@@ -64,6 +73,7 @@ public abstract class Settings {
             .add("accessPoint", accessPoint)
             .add("retryPolicy", retryPolicy)
             .add("requestTimeout", requestTimeout)
+            .add("clientProperties", clientProperties)
             .toString();
     }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
index 624490d4..24e1c9c2 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettings.java
@@ -59,7 +59,8 @@ public class PushSubscriptionSettings extends Settings {
         String group,
         Map<String, FilterExpression> subscriptionExpression
     ) {
-        super(configuration.getNamespace(), clientId, clientType, endpoints, 
configuration.getRequestTimeout());
+        super(configuration.getNamespace(), clientId, clientType, endpoints, 
configuration.getRequestTimeout(),
+            configuration.getClientProperties());
         this.group = new Resource(configuration.getNamespace(), group);
         this.subscriptionExpressions = subscriptionExpression;
     }
@@ -107,7 +108,8 @@ public class PushSubscriptionSettings extends Settings {
             
Subscription.newBuilder().setGroup(group.toProtobuf()).addAllSubscriptions(subscriptionEntries).build();
         return 
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
             
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
-            
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+            
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf())
+            .putAllClientProperties(clientProperties).build();
     }
 
     @Override
@@ -144,6 +146,7 @@ public class PushSubscriptionSettings extends Settings {
             .add("accessPoint", accessPoint)
             .add("retryPolicy", retryPolicy)
             .add("requestTimeout", requestTimeout)
+            .add("clientProperties", clientProperties)
             .add("group", group)
             .add("subscriptionExpressions", subscriptionExpressions)
             .add("fifo", fifo)
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
index 1c9e728d..ebb12fb1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettings.java
@@ -55,7 +55,8 @@ public class SimpleSubscriptionSettings extends Settings {
         Duration longPollingTimeout,
         Map<String, FilterExpression> subscriptionExpression
     ) {
-        super(configuration.getNamespace(), clientId, clientType, endpoints, 
configuration.getRequestTimeout());
+        super(configuration.getNamespace(), clientId, clientType, endpoints, 
configuration.getRequestTimeout(),
+            configuration.getClientProperties());
         this.group = new Resource(configuration.getNamespace(), group);
         this.longPollingTimeout = longPollingTimeout;
         this.subscriptionExpressions = subscriptionExpression;
@@ -92,7 +93,8 @@ public class SimpleSubscriptionSettings extends Settings {
             .addAllSubscriptions(subscriptionEntries).build();
         return 
apache.rocketmq.v2.Settings.newBuilder().setAccessPoint(accessPoint.toProtobuf())
             
.setClientType(clientType.toProtobuf()).setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos()))
-            
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
+            
.setSubscription(subscription).setUserAgent(UserAgent.INSTANCE.toProtoBuf())
+            .putAllClientProperties(clientProperties).build();
     }
 
     @Override
@@ -113,6 +115,7 @@ public class SimpleSubscriptionSettings extends Settings {
             .add("accessPoint", accessPoint)
             .add("retryPolicy", retryPolicy)
             .add("requestTimeout", requestTimeout)
+            .add("clientProperties", clientProperties)
             .add("group", group)
             .add("longPollingTimeout", longPollingTimeout)
             .add("subscriptionExpressions", subscriptionExpressions)
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 7c0ddad1..19cf0ee7 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -115,7 +115,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = 
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
         this.publishingSettings = new 
PublishingSettings(clientConfiguration.getNamespace(), clientId, endpoints,
-            retryPolicy, clientConfiguration.getRequestTimeout(), topics);
+            retryPolicy, clientConfiguration.getRequestTimeout(), topics, 
clientConfiguration.getClientProperties());
         this.checker = checker;
         this.publishingRouteDataCache = new ConcurrentHashMap<>();
     }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
index 29159caa..f728c534 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettings.java
@@ -22,6 +22,7 @@ import apache.rocketmq.v2.Resource;
 import com.google.common.base.MoreObjects;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.client.java.impl.ClientType;
@@ -46,8 +47,9 @@ public class PublishingSettings extends Settings {
     private volatile boolean validateMessageType = true;
 
     public PublishingSettings(String namespace, ClientId clientId, Endpoints 
accessPoint,
-        ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, 
Set<String> topics) {
-        super(namespace, clientId, ClientType.PRODUCER, accessPoint, 
retryPolicy, requestTimeout);
+        ExponentialBackoffRetryPolicy retryPolicy, Duration requestTimeout, 
Set<String> topics,
+        Map<String, String> clientProperties) {
+        super(namespace, clientId, ClientType.PRODUCER, accessPoint, 
retryPolicy, requestTimeout, clientProperties);
         this.topics = topics;
     }
 
@@ -71,7 +73,8 @@ public class PublishingSettings extends Settings {
             .build();
         final apache.rocketmq.v2.Settings.Builder builder = 
apache.rocketmq.v2.Settings.newBuilder()
             
.setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
-            
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
+            
.setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing)
+            .putAllClientProperties(clientProperties);
         return 
builder.setBackoffPolicy(retryPolicy.toProtobuf()).setUserAgent(UserAgent.INSTANCE.toProtoBuf()).build();
     }
 
@@ -100,6 +103,7 @@ public class PublishingSettings extends Settings {
             .add("accessPoint", accessPoint)
             .add("retryPolicy", retryPolicy)
             .add("requestTimeout", requestTimeout)
+            .add("clientProperties", clientProperties)
             .add("topics", topics)
             .add("maxBodySizeBytes", maxBodySizeBytes)
             .toString();
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
index 3ad28f95..631bcdf1 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushSubscriptionSettingsTest.java
@@ -46,6 +46,8 @@ public class PushSubscriptionSettingsTest extends TestBase {
             .setNamespace(FAKE_NAMESPACE)
             .setRequestTimeout(requestTimeout)
             .setEndpoints(FAKE_ENDPOINTS)
+            .addClientProperty("key1", "value1")
+            .addClientProperty("key2", "value2")
             .build();
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
@@ -57,6 +59,8 @@ public class PushSubscriptionSettingsTest extends TestBase {
         final Settings settings = pushSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.PUSH_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
+        Assert.assertEquals("value1", 
settings.getClientPropertiesMap().get("key1"));
+        Assert.assertEquals("value2", 
settings.getClientPropertiesMap().get("key2"));
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
index 5bbdd8ee..2cc3bdec 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleSubscriptionSettingsTest.java
@@ -44,6 +44,8 @@ public class SimpleSubscriptionSettingsTest extends TestBase {
             .setNamespace(FAKE_NAMESPACE)
             .setRequestTimeout(requestTimeout)
             .setEndpoints(FAKE_ENDPOINTS)
+            .addClientProperty("key1", "value1")
+            .addClientProperty("key2", "value2")
             .build();
         ClientId clientId = new ClientId();
         Map<String, FilterExpression> subscriptionExpression = new HashMap<>();
@@ -57,6 +59,8 @@ public class SimpleSubscriptionSettingsTest extends TestBase {
         final Settings settings = simpleSubscriptionSettings.toProtobuf();
         Assert.assertEquals(settings.getClientType(), 
ClientType.SIMPLE_CONSUMER);
         Assert.assertEquals(settings.getRequestTimeout(), 
Durations.fromNanos(requestTimeout.toNanos()));
+        Assert.assertEquals("value1", 
settings.getClientPropertiesMap().get("key1"));
+        Assert.assertEquals("value2", 
settings.getClientPropertiesMap().get("key2"));
         Assert.assertTrue(settings.hasSubscription());
         final Subscription subscription = settings.getSubscription();
         Assert.assertEquals(subscription.getGroup(),
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettingsTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettingsTest.java
new file mode 100644
index 00000000..226cc409
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/PublishingSettingsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.producer;
+
+import apache.rocketmq.v2.ClientType;
+import apache.rocketmq.v2.Settings;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import org.apache.rocketmq.client.java.misc.ClientId;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PublishingSettingsTest extends TestBase {
+
+    @Test
+    public void testToProtobufWithClientProperties() {
+        final Duration requestTimeout = Duration.ofSeconds(3);
+        Map<String, String> clientProperties = new HashMap<>();
+        clientProperties.put("key1", "value1");
+        clientProperties.put("key2", "value2");
+        final PublishingSettings publishingSettings = new PublishingSettings(
+            FAKE_NAMESPACE, new ClientId(), fakeEndpoints(), 
fakeExponentialBackoffRetryPolicy(),
+            requestTimeout, new 
HashSet<>(Collections.singleton(FAKE_TOPIC_0)), clientProperties);
+
+        final Settings settings = publishingSettings.toProtobuf();
+
+        Assert.assertEquals(ClientType.PRODUCER, settings.getClientType());
+        Assert.assertEquals(Durations.fromNanos(requestTimeout.toNanos()), 
settings.getRequestTimeout());
+        Assert.assertEquals("value1", 
settings.getClientPropertiesMap().get("key1"));
+        Assert.assertEquals("value2", 
settings.getClientPropertiesMap().get("key2"));
+        Assert.assertTrue(settings.hasPublishing());
+    }
+}
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index ee316f2b..42c7c3a2 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -52,6 +52,7 @@ import io.grpc.Metadata;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -393,7 +394,7 @@ public class TestBase {
 
     protected PublishingSettings fakeProducerSettings() {
         return new PublishingSettings(FAKE_NAMESPACE, FAKE_CLIENT_ID, 
fakeEndpoints(),
-            fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new 
HashSet<>());
+            fakeExponentialBackoffRetryPolicy(), Duration.ofSeconds(1), new 
HashSet<>(), Collections.emptyMap());
     }
 
     protected SendReceiptImpl fakeSendReceiptImpl(


Reply via email to