This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5bdcdc5 Make proxy advertise protocol version of client to broker (#2845) 5bdcdc5 is described below commit 5bdcdc584834714b7c660cbad84d748ec7b98fee Author: massakam <massa...@yahoo-corp.jp> AuthorDate: Fri Oct 26 15:34:52 2018 +0900 Make proxy advertise protocol version of client to broker (#2845) * Make proxy advertise protocol version of client to broker * Revert incorrect change --- .../org/apache/pulsar/client/impl/ClientCnx.java | 10 +++- .../org/apache/pulsar/common/api/Commands.java | 1 - .../pulsar/proxy/server/DirectProxyHandler.java | 13 +++-- .../apache/pulsar/proxy/server/ProxyClientCnx.java | 40 ++++++++------- .../pulsar/proxy/server/ProxyConnection.java | 28 ++++++++--- .../org/apache/pulsar/proxy/server/ProxyTest.java | 58 ++++++++++++++++++++++ 6 files changed, 117 insertions(+), 33 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 9306a82..ccc20b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -110,6 +110,7 @@ public class ClientCnx extends PulsarHandler { private volatile int numberOfRejectRequests = 0; private final int maxNumberOfRejectedRequestPerConnection; private final int rejectedRequestResetTimeSec = 60; + private final int protocolVersion; private final long operationTimeoutMs; protected String proxyToTargetBrokerAddress = null; @@ -123,6 +124,10 @@ public class ClientCnx extends PulsarHandler { } public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) { + this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion()); + } + + public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) { super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS); checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest()); this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true); @@ -135,6 +140,7 @@ public class ClientCnx extends PulsarHandler { this.state = State.None; this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable(); this.hostnameVerifier = new DefaultHostnameVerifier(); + this.protocolVersion = protocolVersion; } @Override @@ -167,8 +173,8 @@ public class ClientCnx extends PulsarHandler { if (authentication.getAuthData().hasDataFromCommand()) { authData = authentication.getAuthData().getCommandData(); } - return Commands.newConnect(authentication.getAuthMethodName(), authData, - getPulsarClientVersion(), proxyToTargetBrokerAddress); + return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, + getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null); } @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 16e47a0..31dcac1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -1116,7 +1116,6 @@ public class Commands { return (ByteBufPair) ByteBufPair.get(headers, metadataAndPayload); } - @VisibleForTesting public static int getCurrentProtocolVersion() { // Return the last ProtocolVersion enum value return ProtocolVersion.values()[ProtocolVersion.values().length - 1].getNumber(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java index 232db43..c7fa786 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java @@ -59,16 +59,19 @@ public class DirectProxyHandler { private String originalPrincipal; private String clientAuthData; private String clientAuthMethod; + private int protocolVersion; public static final String TLS_HANDLER = "tls"; private final Authentication authentication; - public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl) { + public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection, String targetBrokerUrl, + int protocolVersion) { this.authentication = proxyConnection.getClientAuthentication(); this.inboundChannel = proxyConnection.ctx().channel(); this.originalPrincipal = proxyConnection.clientAuthRole; this.clientAuthData = proxyConnection.clientAuthData; this.clientAuthMethod = proxyConnection.clientAuthMethod; + this.protocolVersion = protocolVersion; ProxyConfiguration config = service.getConfiguration(); // Start the connection attempt. @@ -97,7 +100,7 @@ public class DirectProxyHandler { } ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4)); - ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config)); + ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config, protocolVersion)); } }); @@ -136,9 +139,11 @@ public class DirectProxyHandler { private String remoteHostName; protected ChannelHandlerContext ctx; private ProxyConfiguration config; + private int protocolVersion; - public ProxyBackendHandler(ProxyConfiguration config) { + public ProxyBackendHandler(ProxyConfiguration config, int protocolVersion) { this.config = config; + this.protocolVersion = protocolVersion; } @Override @@ -150,7 +155,7 @@ public class DirectProxyHandler { authData = authentication.getAuthData().getCommandData(); } ByteBuf command = null; - command = Commands.newConnect(authentication.getAuthMethodName(), authData, "Pulsar proxy", + command = Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, "Pulsar proxy", null /* target broker */, originalPrincipal, clientAuthData, clientAuthMethod); outboundChannel.writeAndFlush(command); outboundChannel.read(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java index 9eb1fe7..a075840 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java @@ -32,32 +32,34 @@ import io.netty.channel.EventLoopGroup; public class ProxyClientCnx extends ClientCnx { - String clientAuthRole; - String clientAuthData; - String clientAuthMethod; - - public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, - String clientAuthData, String clientAuthMethod) { - super(conf, eventLoopGroup); - this.clientAuthRole = clientAuthRole; - this.clientAuthData = clientAuthData; - this.clientAuthMethod = clientAuthMethod; - } - - @Override - protected ByteBuf newConnectCommand() throws PulsarClientException { - if (log.isDebugEnabled()) { + String clientAuthRole; + String clientAuthData; + String clientAuthMethod; + int protocolVersion; + + public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, + String clientAuthData, String clientAuthMethod, int protocolVersion) { + super(conf, eventLoopGroup); + this.clientAuthRole = clientAuthRole; + this.clientAuthData = clientAuthData; + this.clientAuthMethod = clientAuthMethod; + this.protocolVersion = protocolVersion; + } + + @Override + protected ByteBuf newConnectCommand() throws PulsarClientException { + if (log.isDebugEnabled()) { log.debug( "New Connection opened via ProxyClientCnx with params clientAuthRole = {}, clientAuthData = {}, clientAuthMethod = {}", clientAuthRole, clientAuthData, clientAuthMethod); - } - String authData = null; + } + String authData = null; if (authentication.getAuthData().hasDataFromCommand()) { authData = authentication.getAuthData().getCommandData(); } - return Commands.newConnect(authentication.getAuthMethodName(), authData, + return Commands.newConnect(authentication.getAuthMethodName(), authData, protocolVersion, getPulsarClientVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, clientAuthMethod); } - + private static final Logger log = LoggerFactory.getLogger(ProxyClientCnx.class); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 2ebea8a..d1aa9dd 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -202,6 +202,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi return; } + int protocolVersionToAdvertise = getProtocolVersionToAdvertise(connect); + if (LOG.isDebugEnabled()) { + LOG.debug( + "[{}] Protocol version to advertise to broker is {}, clientProtocolVersion={}, proxyProtocolVersion={}", + remoteAddress, protocolVersionToAdvertise, remoteEndpointProtocolVersion, + Commands.getCurrentProtocolVersion()); + } + if (!authenticateAndCreateClient(connect)) { ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate")); close(); @@ -213,7 +221,8 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi // connection // there and just pass bytes in both directions state = State.ProxyConnectionToBroker; - directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl()); + directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl(), + protocolVersionToAdvertise); cancelKeepAliveTask(); } else { // Client is doing a lookup, we can consider the handshake complete @@ -221,7 +230,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi // partitions metadata lookups state = State.ProxyLookupRequests; lookupProxyHandler = new LookupProxyHandler(service, this); - ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion())); + ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise)); } } @@ -279,10 +288,11 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi ClientConfigurationData clientConf = createClientConfiguration(); this.clientAuthentication = clientConf.getAuthentication(); + final int protocolVersion = getProtocolVersionToAdvertise(connect); if (!service.getConfiguration().isAuthenticationEnabled()) { this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(), - new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ClientCnx(clientConf, - service.getWorkerGroup()))); + new ProxyConnectionPool(clientConf, service.getWorkerGroup(), + () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion))); return true; } @@ -307,7 +317,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi this.clientAuthData = authData; this.clientAuthMethod = authMethod; } - this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod); + this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersion); return true; } catch (Exception e) { @@ -317,10 +327,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi } private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final String clientAuthData, - final String clientAuthMethod) throws PulsarClientException { + final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { return new PulsarClientImpl(clientConf, service.getWorkerGroup(), new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf, - service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod))); + service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion))); + } + + private static int getProtocolVersionToAdvertise(CommandConnect connect) { + return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion()); } long newRequestId() { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index 99d79fb..09fdbdb 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -24,8 +24,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Mockito.doReturn; import static org.testng.Assert.assertEquals; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -35,8 +39,16 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange; +import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -193,4 +205,50 @@ public class ProxyTest extends MockedPulsarServiceBaseTest { } } + @Test + private void testProtocolVersionAdvertisement() throws Exception { + final String url = "pulsar://localhost:" + proxyConfig.getServicePort(); + final String topic = "persistent://sample/test/local/protocol-version-advertisement"; + final String sub = "my-sub"; + + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setServiceUrl(url); + PulsarClient client = getClientActiveConsumerChangeNotSupported(conf); + + Producer<byte[]> producer = client.newProducer().topic(topic).create(); + Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(sub) + .subscriptionType(SubscriptionType.Failover).subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send("test-msg".getBytes()); + } + + for (int i = 0; i < 10; i++) { + Message<byte[]> msg = consumer.receive(10, TimeUnit.SECONDS); + checkNotNull(msg); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + client.close(); + } + + private static PulsarClient getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf) + throws Exception { + ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon()); + EventLoopGroup eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory); + + ConnectionPool cnxPool = new ConnectionPool(conf, eventLoopGroup, () -> { + return new ClientCnx(conf, eventLoopGroup, ProtocolVersion.v11_VALUE) { + @Override + protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { + throw new UnsupportedOperationException(); + } + }; + }); + + return new PulsarClientImpl(conf, eventLoopGroup, cnxPool); + } + }