merlimat closed pull request #2845: Make proxy advertise protocol version of 
client to broker
URL: https://github.com/apache/pulsar/pull/2845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 9306a8248b..ccc20b3ec3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -110,6 +110,7 @@
     private volatile int numberOfRejectRequests = 0;
     private final int maxNumberOfRejectedRequestPerConnection;
     private final int rejectedRequestResetTimeSec = 60;
+    private final int protocolVersion;
     private final long operationTimeoutMs;
 
     protected String proxyToTargetBrokerAddress = null;
@@ -123,6 +124,10 @@
     }
 
     public ClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup) {
+        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
+    }
+
+    public ClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, int protocolVersion) {
         super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
         checkArgument(conf.getMaxLookupRequest() > 
conf.getConcurrentLookupRequest());
         this.pendingLookupRequestSemaphore = new 
Semaphore(conf.getConcurrentLookupRequest(), true);
@@ -135,6 +140,7 @@ public ClientCnx(ClientConfigurationData conf, 
EventLoopGroup eventLoopGroup) {
         this.state = State.None;
         this.isTlsHostnameVerificationEnable = 
conf.isTlsHostnameVerificationEnable();
         this.hostnameVerifier = new DefaultHostnameVerifier();
+        this.protocolVersion = protocolVersion;
     }
 
     @Override
@@ -167,8 +173,8 @@ protected ByteBuf newConnectCommand() throws 
PulsarClientException {
         if (authentication.getAuthData().hasDataFromCommand()) {
             authData = authentication.getAuthData().getCommandData();
         }
-        return Commands.newConnect(authentication.getAuthMethodName(), 
authData,
-                getPulsarClientVersion(), proxyToTargetBrokerAddress);
+        return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
+                getPulsarClientVersion(), proxyToTargetBrokerAddress, null, 
null, null);
     }
 
     @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 16e47a0148..31dcac190e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -1116,7 +1116,6 @@ private static ByteBufPair 
serializeCommandMessageWithSize(BaseCommand cmd, Byte
         return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload);
     }
 
-    @VisibleForTesting
     public static int getCurrentProtocolVersion() {
         // Return the last ProtocolVersion enum value
         return ProtocolVersion.values()[ProtocolVersion.values().length - 
1].getNumber();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 232db4307e..c7fa786123 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -59,16 +59,19 @@
     private String originalPrincipal;
     private String clientAuthData;
     private String clientAuthMethod;
+    private int protocolVersion;
     public static final String TLS_HANDLER = "tls";
 
     private final Authentication authentication;
 
-    public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection, String targetBrokerUrl) {
+    public DirectProxyHandler(ProxyService service, ProxyConnection 
proxyConnection, String targetBrokerUrl,
+            int protocolVersion) {
         this.authentication = proxyConnection.getClientAuthentication();
         this.inboundChannel = proxyConnection.ctx().channel();
         this.originalPrincipal = proxyConnection.clientAuthRole;
         this.clientAuthData = proxyConnection.clientAuthData;
         this.clientAuthMethod = proxyConnection.clientAuthMethod;
+        this.protocolVersion = protocolVersion;
         ProxyConfiguration config = service.getConfiguration();
 
         // Start the connection attempt.
@@ -97,7 +100,7 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
                 }
                 ch.pipeline().addLast("frameDecoder",
                         new 
LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
-                ch.pipeline().addLast("proxyOutboundHandler", new 
ProxyBackendHandler(config));
+                ch.pipeline().addLast("proxyOutboundHandler", new 
ProxyBackendHandler(config, protocolVersion));
             }
         });
 
@@ -136,9 +139,11 @@ protected void initChannel(SocketChannel ch) throws 
Exception {
         private String remoteHostName;
         protected ChannelHandlerContext ctx;
         private ProxyConfiguration config;
+        private int protocolVersion;
 
-        public ProxyBackendHandler(ProxyConfiguration config) {
+        public ProxyBackendHandler(ProxyConfiguration config, int 
protocolVersion) {
             this.config = config;
+            this.protocolVersion = protocolVersion;
         }
 
         @Override
@@ -150,7 +155,7 @@ public void channelActive(ChannelHandlerContext ctx) throws 
Exception {
                 authData = authentication.getAuthData().getCommandData();
             }
             ByteBuf command = null;
-            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, "Pulsar proxy",
+            command = Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion, "Pulsar proxy",
                     null /* target broker */, originalPrincipal, 
clientAuthData, clientAuthMethod);
             outboundChannel.writeAndFlush(command);
             outboundChannel.read();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index 9eb1fe75fc..a075840311 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -32,32 +32,34 @@
 
 public class ProxyClientCnx extends ClientCnx {
 
-       String clientAuthRole;
-       String clientAuthData;
-       String clientAuthMethod;
-       
-       public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
-                       String clientAuthData, String clientAuthMethod) {
-               super(conf, eventLoopGroup);
-               this.clientAuthRole = clientAuthRole;
-               this.clientAuthData = clientAuthData;
-               this.clientAuthMethod = clientAuthMethod;
-       }
-    
-       @Override
-       protected ByteBuf newConnectCommand() throws PulsarClientException {
-           if (log.isDebugEnabled()) {
+    String clientAuthRole;
+    String clientAuthData;
+    String clientAuthMethod;
+    int protocolVersion;
+
+    public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
+            String clientAuthData, String clientAuthMethod, int 
protocolVersion) {
+        super(conf, eventLoopGroup);
+        this.clientAuthRole = clientAuthRole;
+        this.clientAuthData = clientAuthData;
+        this.clientAuthMethod = clientAuthMethod;
+        this.protocolVersion = protocolVersion;
+    }
+
+    @Override
+    protected ByteBuf newConnectCommand() throws PulsarClientException {
+        if (log.isDebugEnabled()) {
             log.debug(
                     "New Connection opened via ProxyClientCnx with params 
clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
-           }
-           String authData = null;
+        }
+        String authData = null;
         if (authentication.getAuthData().hasDataFromCommand()) {
             authData = authentication.getAuthData().getCommandData();
         }
-        return Commands.newConnect(authentication.getAuthMethodName(), 
authData,
+        return Commands.newConnect(authentication.getAuthMethodName(), 
authData, protocolVersion,
                 getPulsarClientVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData, clientAuthMethod);
     }
-       
+
     private static final Logger log = 
LoggerFactory.getLogger(ProxyClientCnx.class);
 }
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 2ebea8a774..d1aa9dd0da 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -202,6 +202,14 @@ protected void handleConnect(CommandConnect connect) {
             return;
         }
 
+        int protocolVersionToAdvertise = 
getProtocolVersionToAdvertise(connect);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "[{}] Protocol version to advertise to broker is {}, 
clientProtocolVersion={}, proxyProtocolVersion={}",
+                    remoteAddress, protocolVersionToAdvertise, 
remoteEndpointProtocolVersion,
+                    Commands.getCurrentProtocolVersion());
+        }
+
         if (!authenticateAndCreateClient(connect)) {
             ctx.writeAndFlush(Commands.newError(-1, 
ServerError.AuthenticationError, "Failed to authenticate"));
             close();
@@ -213,7 +221,8 @@ protected void handleConnect(CommandConnect connect) {
             // connection
             // there and just pass bytes in both directions
             state = State.ProxyConnectionToBroker;
-            directProxyHandler = new DirectProxyHandler(service, this, 
connect.getProxyToBrokerUrl());
+            directProxyHandler = new DirectProxyHandler(service, this, 
connect.getProxyToBrokerUrl(),
+                    protocolVersionToAdvertise);
             cancelKeepAliveTask();
         } else {
             // Client is doing a lookup, we can consider the handshake complete
@@ -221,7 +230,7 @@ protected void handleConnect(CommandConnect connect) {
             // partitions metadata lookups
             state = State.ProxyLookupRequests;
             lookupProxyHandler = new LookupProxyHandler(service, this);
-            
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
+            
ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise));
         }
     }
 
@@ -279,10 +288,11 @@ private boolean 
authenticateAndCreateClient(CommandConnect connect) {
             ClientConfigurationData clientConf = createClientConfiguration();
             this.clientAuthentication = clientConf.getAuthentication();
 
+            final int protocolVersion = getProtocolVersionToAdvertise(connect);
             if (!service.getConfiguration().isAuthenticationEnabled()) {
                 this.client = new PulsarClientImpl(clientConf, 
service.getWorkerGroup(),
-                        new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(), () -> new ClientCnx(clientConf,
-                                service.getWorkerGroup())));
+                        new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(),
+                                () -> new ClientCnx(clientConf, 
service.getWorkerGroup(), protocolVersion)));
                 return true;
             }
 
@@ -307,7 +317,7 @@ private boolean authenticateAndCreateClient(CommandConnect 
connect) {
                 this.clientAuthData = authData;
                 this.clientAuthMethod = authMethod;
             }
-            this.client = createClient(clientConf, this.clientAuthData, 
this.clientAuthMethod);
+            this.client = createClient(clientConf, this.clientAuthData, 
this.clientAuthMethod, protocolVersion);
 
             return true;
         } catch (Exception e) {
@@ -317,10 +327,14 @@ private boolean 
authenticateAndCreateClient(CommandConnect connect) {
     }
 
     private PulsarClientImpl createClient(final ClientConfigurationData 
clientConf, final String clientAuthData,
-            final String clientAuthMethod) throws PulsarClientException {
+            final String clientAuthMethod, final int protocolVersion) throws 
PulsarClientException {
         return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
                 new ProxyConnectionPool(clientConf, service.getWorkerGroup(), 
() -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod)));
+                        service.getWorkerGroup(), clientAuthRole, 
clientAuthData, clientAuthMethod, protocolVersion)));
+    }
+
+    private static int getProtocolVersionToAdvertise(CommandConnect connect) {
+        return Math.min(connect.getProtocolVersion(), 
Commands.getCurrentProtocolVersion());
     }
 
     long newRequestId() {
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index 99d79fb639..09fdbdb159 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,8 +24,12 @@
 import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
@@ -35,8 +39,16 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.ConnectionPool;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,4 +205,50 @@ public void testRegexSubscription() throws Exception {
         }
     }
 
+    @Test
+    private void testProtocolVersionAdvertisement() throws Exception {
+        final String url = "pulsar://localhost:" + 
proxyConfig.getServicePort();
+        final String topic = 
"persistent://sample/test/local/protocol-version-advertisement";
+        final String sub = "my-sub";
+
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setServiceUrl(url);
+        PulsarClient client = getClientActiveConsumerChangeNotSupported(conf);
+
+        Producer<byte[]> producer = client.newProducer().topic(topic).create();
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName(sub)
+                .subscriptionType(SubscriptionType.Failover).subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test-msg".getBytes());
+        }
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS);
+            checkNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+        client.close();
+    }
+
+    private static PulsarClient 
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
+            throws Exception {
+        ThreadFactory threadFactory = new 
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());
+        EventLoopGroup eventLoopGroup = 
EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
+
+        ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () 
-> {
+            return new ClientCnx(conf, eventLoopGroup, 
ProtocolVersion.v11_VALUE) {
+                @Override
+                protected void 
handleActiveConsumerChange(CommandActiveConsumerChange change) {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        });
+
+        return new PulsarClientImpl(conf, eventLoopGroup, cnxPool);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to