This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2a89b7f1bea [fix][proxy] Propagate client connection feature flags
through Pulsar Proxy to Broker (#24158)
2a89b7f1bea is described below
commit 2a89b7f1bea4c8dfcf48798346e3b02683fa87a2
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 9 11:14:55 2025 +0800
[fix][proxy] Propagate client connection feature flags through Pulsar Proxy
to Broker (#24158)
Co-authored-by: Lari Hotari <[email protected]>
---
.../service/OneWayReplicatorDeduplicationTest.java | 3 +-
.../pulsar/broker/service/ServerCnxTest.java | 4 +-
.../apache/pulsar/common/protocol/Commands.java | 14 +++--
.../pulsar/proxy/server/DirectProxyHandler.java | 13 +++--
.../apache/pulsar/proxy/server/ProxyClientCnx.java | 2 +-
.../pulsar/proxy/server/ProxyConnection.java | 2 +-
.../org/apache/pulsar/proxy/server/ProxyTest.java | 66 ++++++++++++++++++++++
7 files changed, 90 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
index d4c2de05a28..48b9adb52a7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
@@ -221,7 +221,8 @@ public class OneWayReplicatorDeduplicationTest extends
OneWayReplicatorTestBase
authenticationDataProvider =
authentication.getAuthData(remoteHostName);
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
BaseCommand cmd =
Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(),
authData,
- this.protocolVersion, clientVersion,
proxyToTargetBrokerAddress, null, null, null, null);
+ this.protocolVersion, clientVersion,
proxyToTargetBrokerAddress,
+ null, null, null, null, null);
cmd.getConnect().getFeatureFlags().setSupportsReplDedupByLidAndEid(false);
return Commands.serializeWithSize(cmd);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index b1c99940827..15060dceb2a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -327,7 +327,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
ByteBuf clientCommand = Commands.newConnect("none", null, 1, null,
null, null, null, null,
- "my-pulsar-proxy");
+ "my-pulsar-proxy", null);
channel.writeInbound(clientCommand);
assertEquals(serverCnx.getState(), State.Connected);
@@ -601,7 +601,7 @@ public class ServerCnxTest {
assertEquals(serverCnx.getState(), State.Start);
ByteBuf clientCommand = Commands.newConnect(authMethodName,
AuthData.of("pass.pass".getBytes()),
- 1, null, null, null, null, null, "my-pulsar-proxy");
+ 1, null, null, null, null, null, "my-pulsar-proxy", null);
channel.writeInbound(clientCommand);
Object response = getResponse();
assertTrue(response instanceof CommandError);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2cb4f9a40e3..58d58a3acef 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -241,21 +241,21 @@ public class Commands {
String targetBroker, String
originalPrincipal, AuthData originalAuthData,
String originalAuthMethod) {
return newConnect(authMethodName, authData, protocolVersion,
libVersion, targetBroker, originalPrincipal,
- originalAuthData, originalAuthMethod, null);
+ originalAuthData, originalAuthMethod, null, null);
}
public static ByteBuf newConnect(String authMethodName, AuthData authData,
int protocolVersion, String libVersion,
String targetBroker, String
originalPrincipal, AuthData originalAuthData,
- String originalAuthMethod, String
proxyVersion) {
+ String originalAuthMethod, String
proxyVersion, FeatureFlags featureFlags) {
BaseCommand cmd = newConnectWithoutSerialize(authMethodName, authData,
protocolVersion, libVersion,
- targetBroker, originalPrincipal, originalAuthData,
originalAuthMethod, proxyVersion);
+ targetBroker, originalPrincipal, originalAuthData,
originalAuthMethod, proxyVersion, featureFlags);
return serializeWithSize(cmd);
}
public static BaseCommand newConnectWithoutSerialize(String
authMethodName, AuthData authData,
int protocolVersion, String libVersion,
String targetBroker, String
originalPrincipal, AuthData originalAuthData,
- String originalAuthMethod, String
proxyVersion) {
+ String originalAuthMethod, String
proxyVersion, FeatureFlags featureFlags) {
BaseCommand cmd = localCmd(Type.CONNECT);
CommandConnect connect = cmd.setConnect()
.setClientVersion(libVersion != null ? libVersion : "Pulsar
Client")
@@ -286,7 +286,11 @@ public class Commands {
connect.setOriginalAuthMethod(originalAuthMethod);
}
connect.setProtocolVersion(protocolVersion);
- setFeatureFlags(connect.setFeatureFlags());
+ if (featureFlags != null) {
+ connect.setFeatureFlags().copyFrom(featureFlags);
+ } else {
+ setFeatureFlags(connect.setFeatureFlags());
+ }
return cmd;
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 681aa553c48..a0b699ff0c1 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -58,6 +58,7 @@ import
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.common.api.proto.CommandConnected;
+import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarDecoder;
import org.apache.pulsar.common.stats.Rate;
@@ -107,7 +108,8 @@ public class DirectProxyHandler {
this.pulsarSslFactoryMap = new ConcurrentHashMap<>();
}
- public void connect(String brokerHostAndPort, InetSocketAddress
targetBrokerAddress, int protocolVersion) {
+ public void connect(String brokerHostAndPort, InetSocketAddress
targetBrokerAddress, int protocolVersion,
+ final FeatureFlags featureFlags) {
String remoteHost;
try {
remoteHost = parseHost(brokerHostAndPort);
@@ -182,7 +184,7 @@ public class DirectProxyHandler {
service.getConfiguration().getMaxMessageSize() +
Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0,
4));
ch.pipeline().addLast("proxyOutboundHandler",
- (ChannelHandler) new ProxyBackendHandler(config,
protocolVersion, remoteHost));
+ (ChannelHandler) new ProxyBackendHandler(config,
protocolVersion, remoteHost, featureFlags));
}
});
@@ -276,11 +278,14 @@ public class DirectProxyHandler {
protected ChannelHandlerContext ctx;
private final ProxyConfiguration config;
private final int protocolVersion;
+ private final FeatureFlags featureFlags;
- public ProxyBackendHandler(ProxyConfiguration config, int
protocolVersion, String remoteHostName) {
+ public ProxyBackendHandler(ProxyConfiguration config, int
protocolVersion, String remoteHostName,
+ FeatureFlags featureFlags) {
this.config = config;
this.protocolVersion = protocolVersion;
this.remoteHostName = remoteHostName;
+ this.featureFlags = featureFlags;
}
@Override
@@ -297,7 +302,7 @@ public class DirectProxyHandler {
ByteBuf command = Commands.newConnect(
authentication.getAuthMethodName(), authData,
protocolVersion,
proxyConnection.clientVersion, null /* target broker */,
- originalPrincipal, clientAuthData, clientAuthMethod,
PulsarVersion.getVersion());
+ originalPrincipal, clientAuthData, clientAuthMethod,
PulsarVersion.getVersion(), featureFlags);
writeAndFlush(command);
isTlsOutboundChannel =
ProxyConnection.isTlsChannel(inboundChannel);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
index d15d48b9209..1e8e2fb55e4 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java
@@ -73,7 +73,7 @@ public class ProxyClientCnx extends ClientCnx {
AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
return Commands.newConnect(authentication.getAuthMethodName(),
authData, protocolVersion,
proxyConnection.clientVersion, proxyToTargetBrokerAddress,
clientAuthRole, clientAuthData,
- clientAuthMethod, PulsarVersion.getVersion());
+ clientAuthMethod, PulsarVersion.getVersion(), null);
}
@Override
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 540771c86fb..e1179be115a 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -444,7 +444,7 @@ public class ProxyConnection extends PulsarHandler {
private void connectToBroker(InetSocketAddress brokerAddress) {
assert ctx.executor().inEventLoop();
DirectProxyHandler directProxyHandler = new
DirectProxyHandler(service, this);
- directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
protocolVersionToAdvertise);
+ directProxyHandler.connect(proxyToBrokerUrl, brokerAddress,
protocolVersionToAdvertise, features);
}
public void brokerConnected(DirectProxyHandler directProxyHandler,
CommandConnected connected) {
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
index ee770e6edee..8f0fe98be27 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java
@@ -24,6 +24,7 @@ import static java.util.Objects.requireNonNull;
import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
@@ -42,9 +43,11 @@ import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
@@ -52,12 +55,16 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
+import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
+import org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.naming.TopicName;
@@ -66,6 +73,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
@@ -429,6 +437,64 @@ public class ProxyTest extends MockedPulsarServiceBaseTest
{
.get(0).getClientVersion(), String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
}
+ @DataProvider
+ public Object[][] booleanValues() {
+ return new Object[][]{
+ {true},
+ {false}
+ };
+ }
+
+ @Test(dataProvider = "booleanValues")
+ public void testConnectedWithClientSideFeatures(boolean supported) throws
Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+ admin.topics().createNonPartitionedTopic(topic);
+
+ // Create a client as a old version, which does not support
"supportsReplDedupByLidAndEid".
+ ClientBuilderImpl clientBuilder2 =
+ (ClientBuilderImpl)
PulsarClient.builder().serviceUrl(proxyService.getServiceUrl());
+ PulsarClientImpl injectedClient =
InjectedClientCnxClientBuilder.create(clientBuilder2,
+ (conf, eventLoopGroup) -> {
+ return new ClientCnx(InstrumentProvider.NOOP, conf,
eventLoopGroup) {
+
+ @Override
+ protected ByteBuf newConnectCommand() throws Exception {
+ authenticationDataProvider =
authentication.getAuthData(remoteHostName);
+ AuthData authData =
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
+ BaseCommand cmd =
+
Commands.newConnectWithoutSerialize(authentication.getAuthMethodName(),
authData,
+ this.protocolVersion, clientVersion,
proxyToTargetBrokerAddress,
+ null, null, null, null, null);
+ FeatureFlags featureFlags =
cmd.getConnect().getFeatureFlags();
+ featureFlags.setSupportsAuthRefresh(supported);
+ featureFlags.setSupportsBrokerEntryMetadata(supported);
+ featureFlags.setSupportsPartialProducer(supported);
+ featureFlags.setSupportsTopicWatchers(supported);
+
featureFlags.setSupportsReplDedupByLidAndEid(supported);
+
featureFlags.setSupportsGetPartitionedMetadataWithoutAutoCreation(supported);
+ return Commands.serializeWithSize(cmd);
+ }
+ };
+ });
+
+ // Verify: the broker will create a connection, which disabled
"supportsReplDedupByLidAndEid".
+ Producer<byte[]> producer =
injectedClient.newProducer().topic(topic).create();
+ ServerCnx serverCnx = (ServerCnx)
pulsar.getBrokerService().getTopic(topic, false).get().get()
+ .getProducers().values().iterator().next().getCnx();
+ FeatureFlags featureFlags = serverCnx.getFeatures();
+ assertEquals(featureFlags.isSupportsAuthRefresh(), supported);
+ assertEquals(featureFlags.isSupportsBrokerEntryMetadata(), supported);
+ assertEquals(featureFlags.isSupportsPartialProducer(), supported);
+ assertEquals(featureFlags.isSupportsTopicWatchers(), supported);
+ assertEquals(featureFlags.isSupportsReplDedupByLidAndEid(), supported);
+
assertEquals(featureFlags.isSupportsGetPartitionedMetadataWithoutAutoCreation(),
supported);
+
+ // cleanup.
+ producer.close();
+ injectedClient.close();
+ admin.topics().delete(topic);
+ }
+
private PulsarClient
getClientActiveConsumerChangeNotSupported(ClientConfigurationData conf)
throws Exception {
ThreadFactory threadFactory = new
DefaultThreadFactory("pulsar-client-io", Thread.currentThread().isDaemon());