This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 21c7c628d2a PIP-254: Support configuring client version (#20009)
21c7c628d2a is described below
commit 21c7c628d2a3fd7277f16fe7ba72154cfbf08128
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Apr 10 22:34:27 2023 +0800
PIP-254: Support configuring client version (#20009)
---
.../client/api/MutualAuthenticationTest.java | 48 +++++++++++++++++++---
.../client/api/SimpleProducerConsumerTest.java | 34 ++++++++++++++-
.../pulsar/client/impl/ClientBuilderImpl.java | 23 +++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 9 ++--
.../client/impl/conf/ClientConfigurationData.java | 6 +++
5 files changed, 110 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
index 472af3e88cd..2fc8aebf64a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MutualAuthenticationTest.java
@@ -26,20 +26,26 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
/**
* Test Mutual Authentication.
@@ -182,7 +188,7 @@ public class MutualAuthenticationTest extends
ProducerConsumerBase {
}
}
- @BeforeMethod(alwaysRun = true)
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
mutualAuth = new MutualAuthentication();
@@ -205,7 +211,7 @@ public class MutualAuthenticationTest extends
ProducerConsumerBase {
clientBuilder.authentication(mutualAuth);
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
internalCleanup();
@@ -214,12 +220,13 @@ public class MutualAuthenticationTest extends
ProducerConsumerBase {
@Test
public void testAuthentication() throws Exception {
log.info("-- Starting {} test --", methodName);
+ String topic = "persistent://my-property/my-ns/test-authentication";
- Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic1")
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("my-subscriber-name")
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
- .topic("persistent://my-property/my-ns/my-topic1")
+ .topic(topic)
.create();
for (int i = 0; i < 10; i++) {
@@ -239,4 +246,33 @@ public class MutualAuthenticationTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test
+ public void testClientVersion() throws Exception {
+ String defaultClientVersion = "Pulsar-Java-v" +
PulsarVersion.getVersion();
+ String topic = "persistent://my-property/my-ns/test-client-version";
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ TopicStats stats = admin.topics().getStats(topic);
+ assertEquals(stats.getPublishers().size(), 1);
+ assertEquals(stats.getPublishers().get(0).getClientVersion(),
defaultClientVersion);
+
+ PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
+ .description("my-java-client")
+ .serviceUrl(lookupUrl.toString())
+ .authentication(mutualAuth)
+ .build();
+ Producer<byte[]> producer2 =
client.newProducer().topic(topic).create();
+ stats = admin.topics().getStats(topic);
+ assertEquals(stats.getPublishers().size(), 2);
+
+
assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
+ Sets.newHashSet(defaultClientVersion, defaultClientVersion +
"-my-java-client"));
+
+ producer1.close();
+ producer2.close();
+ client.close();
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 293e298fc66..1bc437195d9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -83,10 +83,12 @@ import
org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.ConsumerImpl;
@@ -105,6 +107,8 @@ import
org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
@@ -4581,4 +4585,32 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// sendAsync should complete in time
assertNotNull(producer.sendAsync(msg).get(timeoutSec,
TimeUnit.SECONDS));
}
-}
\ No newline at end of file
+
+ @Test
+ public void testClientVersion() throws Exception {
+ String defaultClientVersion = "Pulsar-Java-v" +
PulsarVersion.getVersion();
+ String topic = "persistent://my-property/my-ns/test-client-version";
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ TopicStats stats = admin.topics().getStats(topic);
+ assertEquals(stats.getPublishers().size(), 1);
+ assertEquals(stats.getPublishers().get(0).getClientVersion(),
defaultClientVersion);
+
+ PulsarClient client = ((ClientBuilderImpl) PulsarClient.builder())
+ .description("my-java-client")
+ .serviceUrl(lookupUrl.toString())
+ .build();
+ Producer<byte[]> producer2 =
client.newProducer().topic(topic).create();
+ stats = admin.topics().getStats(topic);
+ assertEquals(stats.getPublishers().size(), 2);
+
+
assertEquals(stats.getPublishers().stream().map(PublisherStats::getClientVersion).collect(Collectors.toSet()),
+ Sets.newHashSet(defaultClientVersion, defaultClientVersion +
"-my-java-client"));
+
+ producer1.close();
+ producer2.close();
+ client.close();
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 523acdace39..7677045f089 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -410,4 +410,27 @@ public class ClientBuilderImpl implements ClientBuilder {
conf.setSocks5ProxyPassword(socks5ProxyPassword);
return this;
}
+
+ /**
+ * Set the description.
+ *
+ * <p> By default, when the client connects to the broker, a version
string like "Pulsar-Java-v<x.y.z>" will be
+ * carried and saved by the broker. The client version string could be
queried from the topic stats.
+ *
+ * <p> This method provides a way to add more description to a specific
PulsarClient instance. If it's configured,
+ * the description will be appended to the original client version string,
with '-' as the separator.
+ *
+ * <p>For example, if the client version is 3.0.0, and the description is
"forked", the final client version string
+ * will be "Pulsar-Java-v3.0.0-forked".
+ *
+ * @param description the description of the current PulsarClient instance
+ * @throws IllegalArgumentException if the length of description exceeds 64
+ */
+ public ClientBuilder description(String description) {
+ if (description != null && description.length() > 64) {
+ throw new IllegalArgumentException("description should be at most
64 characters");
+ }
+ conf.setDescription(description);
+ return this;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7780856c694..115c71307c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -194,6 +194,8 @@ public class ClientCnx extends PulsarHandler {
@Getter
private long lastDisconnectedTimestamp;
+ private final String clientVersion;
+
protected enum State {
None, SentConnectFrame, Ready, Failed, Connecting
}
@@ -252,6 +254,8 @@ public class ClientCnx extends PulsarHandler {
this.state = State.None;
this.protocolVersion = protocolVersion;
this.idleState = new ClientCnxIdleState(this);
+ this.clientVersion = "Pulsar-Java-v" + PulsarVersion.getVersion()
+ + (conf.getDescription() == null ? "" : ("-" +
conf.getDescription()));
}
@Override
@@ -293,8 +297,7 @@ public class ClientCnx extends PulsarHandler {
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(),
authData, this.protocolVersion,
- String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()),
proxyToTargetBrokerAddress, null, null,
- null);
+ clientVersion, proxyToTargetBrokerAddress, null, null, null);
}
@Override
@@ -411,7 +414,7 @@ public class ClientCnx extends PulsarHandler {
ByteBuf request =
Commands.newAuthResponse(authentication.getAuthMethodName(),
authData,
this.protocolVersion,
- String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
+ clientVersion);
if (log.isDebugEnabled()) {
log.debug("{} Mutual auth {}", ctx.channel(),
authentication.getAuthMethodName());
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 1e7bc6f8221..7d94675ccba 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -379,6 +379,12 @@ public class ClientConfigurationData implements
Serializable, Cloneable {
@Secret
private String socks5ProxyPassword;
+ @ApiModelProperty(
+ name = "description",
+ value = "The extra description of the client version. The length
cannot exceed 64."
+ )
+ private String description;
+
/**
* Gets the authentication settings for the client.
*