This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 21c7c628d2a PIP-254: Support configuring client version (#20009)
21c7c628d2a is described below

commit 21c7c628d2a3fd7277f16fe7ba72154cfbf08128
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Apr 10 22:34:27 2023 +0800

    PIP-254: Support configuring client version (#20009)
---
 .../client/api/MutualAuthenticationTest.java       | 48 +++++++++++++++++++---
 .../client/api/SimpleProducerConsumerTest.java     | 34 ++++++++++++++-
 .../pulsar/client/impl/ClientBuilderImpl.java      | 23 +++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  9 ++--
 .../client/impl/conf/ClientConfigurationData.java  |  6 +++
 5 files changed, 110 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 472af3e88cd..2fc8aebf64a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -26,20 +26,26 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
 
 /**
  * Test Mutual Authentication.
@@ -182,7 +188,7 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
         }
     }
 
-    @BeforeMethod(alwaysRun = true)
+    @BeforeClass(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
         mutualAuth = new MutualAuthentication();
@@ -205,7 +211,7 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
         clientBuilder.authentication(mutualAuth);
     }
 
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
         internalCleanup();
@@ -214,12 +220,13 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
     @Test
     public void testAuthentication() throws Exception {
         log.info("-- Starting {} test --", methodName);
+        String topic = "persistent://my-property/my-ns/test-authentication";
 
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
             .subscriptionName("my-subscriber-name")
             .subscribe();
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
-            .topic("persistent://my-property/my-ns/my-topic1")
+            .topic(topic)
             .create();
 
         for (int i = 0; i < 10; i++) {
@@ -239,4 +246,33 @@ public class MutualAuthenticationTest extends 
ProducerConsumerBase {
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test
+    public void testClientVersion() throws Exception {
+        String defaultClientVersion = "Pulsar-Java-v" + 
PulsarVersion.getVersion();
+        String topic = "persistent://my-property/my-ns/test-client-version";
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        TopicStats stats = admin.topics().getStats(topic);
+        assertEquals(stats.getPublishers().size(), 1);
+        assertEquals(stats.getPublishers().get(0).getClientVersion(), 
defaultClientVersion);
+
+        PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
+                .description("my-java-client")
+                .serviceUrl(lookupUrl.toString())
+                .authentication(mutualAuth)
+                .build();
+        Producer<byte[]> producer2 = 
client.newProducer().topic(topic).create();
+        stats = admin.topics().getStats(topic);
+        assertEquals(stats.getPublishers().size(), 2);
+
+        
assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
+                Sets.newHashSet(defaultClientVersion, defaultClientVersion + 
"-my-java-client"));
+
+        producer1.close();
+        producer2.close();
+        client.close();
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 293e298fc66..1bc437195d9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -83,10 +83,12 @@ import 
org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConsumerBase;
 import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -105,6 +107,8 @@ import 
org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -4581,4 +4585,32 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         // sendAsync should complete in time
         assertNotNull(producer.sendAsync(msg).get(timeoutSec, 
TimeUnit.SECONDS));
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testClientVersion() throws Exception {
+        String defaultClientVersion = "Pulsar-Java-v" + 
PulsarVersion.getVersion();
+        String topic = "persistent://my-property/my-ns/test-client-version";
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        TopicStats stats = admin.topics().getStats(topic);
+        assertEquals(stats.getPublishers().size(), 1);
+        assertEquals(stats.getPublishers().get(0).getClientVersion(), 
defaultClientVersion);
+
+        PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
+                .description("my-java-client")
+                .serviceUrl(lookupUrl.toString())
+                .build();
+        Producer<byte[]> producer2 = 
client.newProducer().topic(topic).create();
+        stats = admin.topics().getStats(topic);
+        assertEquals(stats.getPublishers().size(), 2);
+
+        
assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
+                Sets.newHashSet(defaultClientVersion, defaultClientVersion + 
"-my-java-client"));
+
+        producer1.close();
+        producer2.close();
+        client.close();
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 523acdace39..7677045f089 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -410,4 +410,27 @@ public class ClientBuilderImpl implements ClientBuilder {
         conf.setSocks5ProxyPassword(socks5ProxyPassword);
         return this;
     }
+
+    /**
+     * Set the description.
+     *
+     * <p> By default, when the client connects to the broker, a version 
string like "Pulsar-Java-v<x.y.z>" will be
+     * carried and saved by the broker. The client version string could be 
queried from the topic stats.
+     *
+     * <p> This method provides a way to add more description to a specific 
PulsarClient instance. If it's configured,
+     * the description will be appended to the original client version string, 
with '-' as the separator.
+     *
+     * <p>For example, if the client version is 3.0.0, and the description is 
"forked", the final client version string
+     * will be "Pulsar-Java-v3.0.0-forked".
+     *
+     * @param description the description of the current PulsarClient instance
+     * @throws IllegalArgumentException if the length of description exceeds 64
+     */
+    public ClientBuilder description(String description) {
+        if (description != null && description.length() > 64) {
+            throw new IllegalArgumentException("description should be at most 
64 characters");
+        }
+        conf.setDescription(description);
+        return this;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7780856c694..115c71307c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -194,6 +194,8 @@ public class ClientCnx extends PulsarHandler {
     @Getter
     private long lastDisconnectedTimestamp;
 
+    private final String clientVersion;
+
     protected enum State {
         None, SentConnectFrame, Ready, Failed, Connecting
     }
@@ -252,6 +254,8 @@ public class ClientCnx extends PulsarHandler {
         this.state = State.None;
         this.protocolVersion = protocolVersion;
         this.idleState = new ClientCnxIdleState(this);
+        this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+                + (conf.getDescription() == null ? "" : ("-" + 
conf.getDescription()));
     }
 
     @Override
@@ -293,8 +297,7 @@ public class ClientCnx extends PulsarHandler {
         authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
-                String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()), 
proxyToTargetBrokerAddress, null, null,
-                null);
+                clientVersion, proxyToTargetBrokerAddress, null, null, null);
     }
 
     @Override
@@ -411,7 +414,7 @@ public class ClientCnx extends PulsarHandler {
             ByteBuf request = 
Commands.newAuthResponse(authentication.getAuthMethodName(),
                     authData,
                     this.protocolVersion,
-                    String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
+                    clientVersion);
 
             if (log.isDebugEnabled()) {
                 log.debug("{} Mutual auth {}", ctx.channel(), 
authentication.getAuthMethodName());
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 1e7bc6f8221..7d94675ccba 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -379,6 +379,12 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     @Secret
     private String socks5ProxyPassword;
 
+    @ApiModelProperty(
+            name = "description",
+            value = "The extra description of the client version. The length 
cannot exceed 64."
+    )
+    private String description;
+
     /**
      * Gets the authentication settings for the client.
      *

Reply via email to