This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new ab3d6ca2f70 [fix][proxy] Fix using wrong client version in pulsar 
proxy (#19569)
ab3d6ca2f70 is described below

commit ab3d6ca2f70804f8d632ad4c64af1d2e4e1d2a6a
Author: Zike Yang <[email protected]>
AuthorDate: Mon Feb 20 20:55:00 2023 +0800

    [fix][proxy] Fix using wrong client version in pulsar proxy (#19569)
---
 .../pulsar/proxy/server/DirectProxyHandler.java    |  3 ++-
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  3 +--
 .../pulsar/proxy/server/ProxyConnection.java       |  5 +++--
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 23 ++++++++++++++++++++++
 4 files changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 2f067282115..ee5baaa8b85 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -324,7 +324,8 @@ public class DirectProxyHandler {
             authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
             AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
             ByteBuf command;
-            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion, "Pulsar proxy",
+            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
+                    proxyConnection.clientVersion,
                     null /* target broker */, originalPrincipal, 
clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command)
                     
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 283b835fff5..6bb867f1a9c 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import java.util.Arrays;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
@@ -60,7 +59,7 @@ public class ProxyClientCnx extends ClientCnx {
         authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
-                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
+                proxyConnection.clientVersion, proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
                 clientAuthMethod);
     }
 
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 57ea60e9f19..97e5079f8a8 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -45,7 +45,6 @@ import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import lombok.Getter;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
@@ -96,6 +95,7 @@ public class ProxyConnection extends PulsarHandler {
     String clientAuthRole;
     AuthData clientAuthData;
     String clientAuthMethod;
+    String clientVersion;
 
     private String authMethod = "none";
     AuthenticationProvider authenticationProvider;
@@ -450,6 +450,7 @@ public class ProxyConnection extends PulsarHandler {
         this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
         this.protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
         this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ? 
connect.getProxyToBrokerUrl() : "null";
+        this.clientVersion = connect.getClientVersion();
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received CONNECT from {} proxyToBroker={}", 
remoteAddress, proxyToBrokerUrl);
@@ -538,7 +539,7 @@ public class ProxyConnection extends PulsarHandler {
                     if (authResponse.hasClientVersion()) {
                         clientVersion = authResponse.getClientVersion();
                     } else {
-                        clientVersion = PulsarVersion.getVersion();
+                        clientVersion = this.clientVersion;
                     }
                     int protocolVersion;
                     if (authResponse.hasProtocolVersion()) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 728dfb20478..d1f30c13a93 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -35,6 +35,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.client.api.Consumer;
@@ -310,6 +311,28 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
         }
     }
 
+    @Test
+    public void testGetClientVersion() throws Exception {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+                .build();
+
+        String topic = "persistent://sample/test/local/testGetClientVersion";
+        String subName = "test-sub";
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer()
+                .topic(topic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        consumer.receiveAsync();
+
+
+        
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers()
+                .get(0).getClientVersion(), PulsarVersion.getVersion());
+    }
+
     private static PulsarClient 
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
             throws Exception {
         ThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());

Reply via email to