This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f1765be36f0 [fix][proxy] Fix using wrong client version in pulsar
proxy (#19540)
f1765be36f0 is described below
commit f1765be36f050bb9771a2aa7f4404d1d4824e1bf
Author: Zike Yang <[email protected]>
AuthorDate: Fri Feb 17 18:47:57 2023 +0800
[fix][proxy] Fix using wrong client version in pulsar proxy (#19540)
### Motivations
Currently, if we connect the client to the proxy, the `clientVersion` won't
be send to the broker and we can't get the client version using the PulsarAdmin.
For example, there is no `clientVersion` field shown in the output of topic
stats:
```
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"averageMsgSize" : 0.0,
"chunkedMessageRate" : 0.0,
"producerId" : 0,
"metadata" : { },
"address" : "/127.0.0.1:65385",
"producerName" : "AlvaroProducer",
"connectedSince" : "2023-02-16T11:34:30.384548+08:00"
} ],
```
It works fine when directly connecting to the broker.
The root cause is that the pulsar proxy doesn't pass the clientVersion from
the client to the broker. It set it to `Pulsar proxy`. And thus it will be
ignored due to here :
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L693-L695
### Modifications
* Use the correct clientVersion from the client
Signed-off-by: Zike Yang <[email protected]>
---
.../pulsar/proxy/server/DirectProxyHandler.java | 2 +-
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 3 +--
.../pulsar/proxy/server/ProxyConnection.java | 5 +++--
.../org/apache/pulsar/proxy/server/ProxyTest.java | 23 ++++++++++++++++++++++
4 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 4b5fef3a994..1e9fd676573 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -325,7 +325,7 @@ public class DirectProxyHandler {
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
ByteBuf command = Commands.newConnect(
authentication.getAuthMethodName(), authData,
protocolVersion,
- "Pulsar proxy", null /* target broker */,
+ proxyConnection.clientVersion, null /* target broker */,
originalPrincipal, clientAuthData, clientAuthMethod);
writeAndFlush(command);
isTlsOutboundChannel =
ProxyConnection.isTlsChannel(inboundChannel);
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 6985e1f96e0..a1994fb5af4 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
@@ -66,7 +65,7 @@ public class ProxyClientCnx extends ClientCnx {
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
- PulsarVersion.getVersion(), proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
+ proxyConnection.clientVersion, proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
clientAuthMethod);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 5ee79f4ad23..5a53f6ec014 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -45,7 +45,6 @@ import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
-import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
@@ -99,6 +98,7 @@ public class ProxyConnection extends PulsarHandler {
String clientAuthRole;
AuthData clientAuthData;
String clientAuthMethod;
+ String clientVersion;
private String authMethod = "none";
AuthenticationProvider authenticationProvider;
@@ -475,6 +475,7 @@ public class ProxyConnection extends PulsarHandler {
this.hasProxyToBrokerUrl = connect.hasProxyToBrokerUrl();
this.protocolVersionToAdvertise =
getProtocolVersionToAdvertise(connect);
this.proxyToBrokerUrl = connect.hasProxyToBrokerUrl() ?
connect.getProxyToBrokerUrl() : "null";
+ this.clientVersion = connect.getClientVersion();
if (LOG.isDebugEnabled()) {
LOG.debug("Received CONNECT from {} proxyToBroker={}",
remoteAddress, proxyToBrokerUrl);
@@ -568,7 +569,7 @@ public class ProxyConnection extends PulsarHandler {
if (authResponse.hasClientVersion()) {
clientVersion = authResponse.getClientVersion();
} else {
- clientVersion = PulsarVersion.getVersion();
+ clientVersion = this.clientVersion;
}
int protocolVersion;
if (authResponse.hasProtocolVersion()) {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 6c9a834bb04..af128ce036f 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -35,6 +35,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Consumer;
@@ -311,6 +312,28 @@ public class ProxyTest extends MockedPulsarServiceBaseTest
{
}
}
+ @Test
+ public void testGetClientVersion() throws Exception {
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl())
+ .build();
+
+ String topic = "persistent://sample/test/local/testGetClientVersion";
+ String subName = "test-sub";
+
+ @Cleanup
+ Consumer<byte[]> consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ consumer.receiveAsync();
+
+
+
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers()
+ .get(0).getClientVersion(), PulsarVersion.getVersion());
+ }
+
private static PulsarClient
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());