This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a89b7f1bea [fix][proxy] Propagate client connection feature flags 
through Pulsar Proxy to Broker (#24158)
2a89b7f1bea is described below

commit 2a89b7f1bea4c8dfcf48798346e3b02683fa87a2
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 9 11:14:55 2025 +0800

    [fix][proxy] Propagate client connection feature flags through Pulsar Proxy 
to Broker (#24158)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 .../service/OneWayReplicatorDeduplicationTest.java |  3 +-
 .../pulsar/broker/service/ServerCnxTest.java       |  4 +-
 .../apache/pulsar/common/protocol/Commands.java    | 14 +++--
 .../pulsar/proxy/server/DirectProxyHandler.java    | 13 +++--
 .../apache/pulsar/proxy/server/ProxyClientCnx.java |  2 +-
 .../pulsar/proxy/server/ProxyConnection.java       |  2 +-
 .../org/apache/pulsar/proxy/server/ProxyTest.java  | 66 ++++++++++++++++++++++
 7 files changed, 90 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
index d4c2de05a28..48b9adb52a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
@@ -221,7 +221,8 @@ public class OneWayReplicatorDeduplicationTest extends 
OneWayReplicatorTestBase
                     authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
                     AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
                     BaseCommand cmd = 
Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), 
authData,
-                            this.protocolVersion, clientVersion, 
proxyToTargetBrokerAddress, null, null, null, null);
+                            this.protocolVersion, clientVersion, 
proxyToTargetBrokerAddress,
+                            null, null, null, null, null);
                     
cmd.getConnect().getFeatureFlags().setSupportsReplDedupByLidAndEid(false);
                     return Commands.serializeWithSize(cmd);
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b1c99940827..15060dceb2a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -327,7 +327,7 @@ public class ServerCnxTest {
         assertEquals(serverCnx.getState(), State.Start);
 
         ByteBuf clientCommand = Commands.newConnect("none", null, 1, null, 
null, null, null, null,
-                "my-pulsar-proxy");
+                "my-pulsar-proxy", null);
         channel.writeInbound(clientCommand);
 
         assertEquals(serverCnx.getState(), State.Connected);
@@ -601,7 +601,7 @@ public class ServerCnxTest {
         assertEquals(serverCnx.getState(), State.Start);
 
         ByteBuf clientCommand = Commands.newConnect(authMethodName, 
AuthData.of("pass.pass".getBytes()),
-                1, null, null, null, null, null, "my-pulsar-proxy");
+                1, null, null, null, null, null, "my-pulsar-proxy", null);
         channel.writeInbound(clientCommand);
         Object response = getResponse();
         assertTrue(response instanceof CommandError);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2cb4f9a40e3..58d58a3acef 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -241,21 +241,21 @@ public class Commands {
                                      String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
                                      String originalAuthMethod) {
         return newConnect(authMethodName, authData, protocolVersion, 
libVersion, targetBroker, originalPrincipal,
-                originalAuthData, originalAuthMethod, null);
+                originalAuthData, originalAuthMethod, null, null);
     }
 
     public static ByteBuf newConnect(String authMethodName, AuthData authData, 
int protocolVersion, String libVersion,
                                      String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
-                                     String originalAuthMethod, String 
proxyVersion) {
+                                     String originalAuthMethod, String 
proxyVersion, FeatureFlags featureFlags) {
         BaseCommand cmd = newConnectWithoutSerialize(authMethodName, authData, 
protocolVersion, libVersion,
-                targetBroker, originalPrincipal, originalAuthData, 
originalAuthMethod, proxyVersion);
+                targetBroker, originalPrincipal, originalAuthData, 
originalAuthMethod, proxyVersion, featureFlags);
         return serializeWithSize(cmd);
     }
 
     public static BaseCommand newConnectWithoutSerialize(String 
authMethodName, AuthData authData,
                                     int protocolVersion, String libVersion,
                                     String targetBroker, String 
originalPrincipal, AuthData originalAuthData,
-                                    String originalAuthMethod, String 
proxyVersion) {
+                                    String originalAuthMethod, String 
proxyVersion, FeatureFlags featureFlags) {
         BaseCommand cmd = localCmd(Type.CONNECT);
         CommandConnect connect = cmd.setConnect()
                 .setClientVersion(libVersion != null ? libVersion : "Pulsar 
Client")
@@ -286,7 +286,11 @@ public class Commands {
             connect.setOriginalAuthMethod(originalAuthMethod);
         }
         connect.setProtocolVersion(protocolVersion);
-        setFeatureFlags(connect.setFeatureFlags());
+        if (featureFlags != null) {
+            connect.setFeatureFlags().copyFrom(featureFlags);
+        } else {
+            setFeatureFlags(connect.setFeatureFlags());
+        }
 
         return cmd;
     }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 681aa553c48..a0b699ff0c1 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -58,6 +58,7 @@ import 
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarDecoder;
 import org.apache.pulsar.common.stats.Rate;
@@ -107,7 +108,8 @@ public class DirectProxyHandler {
         this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
     }
 
-    public void connect(String brokerHostAndPort, InetSocketAddress 
targetBrokerAddress, int protocolVersion) {
+    public void connect(String brokerHostAndPort, InetSocketAddress 
targetBrokerAddress, int protocolVersion,
+                        final FeatureFlags featureFlags) {
         String remoteHost;
         try {
             remoteHost = parseHost(brokerHostAndPort);
@@ -182,7 +184,7 @@ public class DirectProxyHandler {
                         service.getConfiguration().getMaxMessageSize() + 
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
                         4));
                 ch.pipeline().addLast("proxyOutboundHandler",
-                        (ChannelHandler) new ProxyBackendHandler(config, 
protocolVersion, remoteHost));
+                        (ChannelHandler) new ProxyBackendHandler(config, 
protocolVersion, remoteHost, featureFlags));
             }
         });
 
@@ -276,11 +278,14 @@ public class DirectProxyHandler {
         protected ChannelHandlerContext ctx;
         private final ProxyConfiguration config;
         private final int protocolVersion;
+        private final FeatureFlags featureFlags;
 
-        public ProxyBackendHandler(ProxyConfiguration config, int 
protocolVersion, String remoteHostName) {
+        public ProxyBackendHandler(ProxyConfiguration config, int 
protocolVersion, String remoteHostName,
+                                   FeatureFlags featureFlags) {
             this.config = config;
             this.protocolVersion = protocolVersion;
             this.remoteHostName = remoteHostName;
+            this.featureFlags = featureFlags;
         }
 
         @Override
@@ -297,7 +302,7 @@ public class DirectProxyHandler {
             ByteBuf command = Commands.newConnect(
                     authentication.getAuthMethodName(), authData, 
protocolVersion,
                     proxyConnection.clientVersion, null /* target broker */,
-                    originalPrincipal, clientAuthData, clientAuthMethod, 
PulsarVersion.getVersion());
+                    originalPrincipal, clientAuthData, clientAuthMethod, 
PulsarVersion.getVersion(), featureFlags);
             writeAndFlush(command);
             isTlsOutboundChannel = 
ProxyConnection.isTlsChannel(inboundChannel);
         }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index d15d48b9209..1e8e2fb55e4 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -73,7 +73,7 @@ public class ProxyClientCnx extends ClientCnx {
         AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
         return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
                 proxyConnection.clientVersion, proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
-                clientAuthMethod, PulsarVersion.getVersion());
+                clientAuthMethod, PulsarVersion.getVersion(), null);
     }
 
     @Override
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 540771c86fb..e1179be115a 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -444,7 +444,7 @@ public class ProxyConnection extends PulsarHandler {
     private void connectToBroker(InetSocketAddress brokerAddress) {
         assert ctx.executor().inEventLoop();
         DirectProxyHandler directProxyHandler = new 
DirectProxyHandler(service, this);
-        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, 
protocolVersionToAdvertise);
+        directProxyHandler.connect(proxyToBrokerUrl, brokerAddress, 
protocolVersionToAdvertise, features);
     }
 
     public void brokerConnected(DirectProxyHandler directProxyHandler, 
CommandConnected connected) {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index ee770e6edee..8f0fe98be27 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,6 +24,7 @@ import static java.util.Objects.requireNonNull;
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
@@ -42,9 +43,11 @@ import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -52,12 +55,16 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.FeatureFlags;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.naming.TopicName;
@@ -66,6 +73,7 @@ import 
org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -429,6 +437,64 @@ public class ProxyTest extends MockedPulsarServiceBaseTest 
{
                 .get(0).getClientVersion(), String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
     }
 
+    @DataProvider
+    public Object[][] booleanValues() {
+        return new Object[][]{
+                {true},
+                {false}
+        };
+    }
+
+    @Test(dataProvider = "booleanValues")
+    public void testConnectedWithClientSideFeatures(boolean supported) throws 
Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+
+        // Create a client as a old version, which does not support 
"supportsReplDedupByLidAndEid".
+        ClientBuilderImpl clientBuilder2 =
+                (ClientBuilderImpl) 
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl());
+        PulsarClientImpl injectedClient = 
InjectedClientCnxClientBuilder.create(clientBuilder2,
+            (conf, eventLoopGroup) -> {
+                return new ClientCnx(InstrumentProvider.NOOP, conf, 
eventLoopGroup) {
+
+                    @Override
+                    protected ByteBuf newConnectCommand() throws Exception {
+                        authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+                        AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
+                        BaseCommand cmd =
+                                
Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(), 
authData,
+                                        this.protocolVersion, clientVersion, 
proxyToTargetBrokerAddress,
+                                        null, null, null, null, null);
+                        FeatureFlags featureFlags = 
cmd.getConnect().getFeatureFlags();
+                        featureFlags.setSupportsAuthRefresh(supported);
+                        featureFlags.setSupportsBrokerEntryMetadata(supported);
+                        featureFlags.setSupportsPartialProducer(supported);
+                        featureFlags.setSupportsTopicWatchers(supported);
+                        
featureFlags.setSupportsReplDedupByLidAndEid(supported);
+                        
featureFlags.setSupportsGetPartitionedMetadataWithoutAutoCreation(supported);
+                        return Commands.serializeWithSize(cmd);
+                    }
+                };
+            });
+
+        // Verify: the broker will create a connection, which disabled 
"supportsReplDedupByLidAndEid".
+        Producer<byte[]> producer = 
injectedClient.newProducer().topic(topic).create();
+        ServerCnx serverCnx = (ServerCnx) 
pulsar.getBrokerService().getTopic(topic, false).get().get()
+                .getProducers().values().iterator().next().getCnx();
+        FeatureFlags featureFlags = serverCnx.getFeatures();
+        assertEquals(featureFlags.isSupportsAuthRefresh(), supported);
+        assertEquals(featureFlags.isSupportsBrokerEntryMetadata(), supported);
+        assertEquals(featureFlags.isSupportsPartialProducer(), supported);
+        assertEquals(featureFlags.isSupportsTopicWatchers(), supported);
+        assertEquals(featureFlags.isSupportsReplDedupByLidAndEid(), supported);
+        
assertEquals(featureFlags.isSupportsGetPartitionedMetadataWithoutAutoCreation(),
 supported);
+
+        // cleanup.
+        producer.close();
+        injectedClient.close();
+        admin.topics().delete(topic);
+    }
+
     private PulsarClient 
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
             throws Exception {
         ThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());

Reply via email to