Re: [PR] [improve][client] Add connectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder [pulsar]
lhotari commented on code in PR #22541: URL: https://github.com/apache/pulsar/pull/22541#discussion_r1574216061 ## pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java: ## @@ -47,6 +47,7 @@ public PulsarAdmin build() throws PulsarClientException { public PulsarAdminBuilderImpl() { this.conf = new ClientConfigurationData(); +this.conf.setConnectionsPerBroker(16); Review Comment: > Couldn't the default be part of `ClientConfigurationData` constructor? Not really. ClientConfigurationData is designed for PulsarClient, but it's also used in PulsarAdmin client. The current PulsarAdminBuilderImpl is a bit of a hack around ClientConfigurationData. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-dotpulsar) branch master updated: Add sample using send channel (#215)
This is an automated email from the ASF dual-hosted git repository. blankensteiner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git The following commit(s) were added to refs/heads/master by this push: new 7a6d825 Add sample using send channel (#215) 7a6d825 is described below commit 7a6d825da5c31f662a1db35846143c0606bd824f Author: Kristian Andersen AuthorDate: Mon Apr 22 08:45:09 2024 +0200 Add sample using send channel (#215) --- DotPulsar.sln | 7 +++ samples/SendChannel/Program.cs | 82 ++ samples/SendChannel/SendChannel.csproj | 12 + 3 files changed, 101 insertions(+) diff --git a/DotPulsar.sln b/DotPulsar.sln index 1706858..ed8376d 100644 --- a/DotPulsar.sln +++ b/DotPulsar.sln @@ -30,6 +30,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Compression", "benchmarks\C EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", "Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", "samples\SendChannel\SendChannel.csproj", "{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -64,6 +66,10 @@ Global {040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = Debug|Any CPU {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.ActiveCfg = Release|Any CPU {040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0 = Release|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -75,6 +81,7 @@ Global {6D44683B-865C-4D15-9F0A-1A8441354589} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} {CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} {040F8253-074D-4977-BDB1-0D9798B52CE2} = {2C57AF4B-0D23-42D7-86FE-80277FD52875} + {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = {E7106D0F-B255-4631-9FB8-734FC5748FA9} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789} diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs new file mode 100644 index 000..d79f038 --- /dev/null +++ b/samples/SendChannel/Program.cs @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace SendChannel; + +using DotPulsar; +using DotPulsar.Abstractions; +using DotPulsar.Extensions; +using System; +using System.Threading; +using System.Threading.Tasks; + +internal static class Program +{ +private static async Task Main() +{ +var cts = new CancellationTokenSource(); + +Console.CancelKeyPress += (sender, args) => +{ +cts.Cancel(); +args.Cancel = true; +}; + +await using var client = PulsarClient.Builder().Build(); // Connecting to pulsar://localhost:6650 + +await using var producer = client.NewProducer(Schema.String) +.StateChangedHandler(Monitor) +.Topic("persistent://public/default/mytopic") +.Create(); + +Console.WriteLine("Press Ctrl+C to exit"); + +var sendChannel = producer.SendChannel; +await ProduceMessages(sendChannel, 1000, cts.Token); +sendChannel.Complete(); + +var shutdownCts = new CancellationTokenSource(); +shutdownCts.CancelAfter(TimeSpan.FromSeconds(30)); +await sendChannel.Completion(shutdownCts.Token); +} + +private static async Task ProduceMessages(ISendChannel sendChannel, int messages, CancellationToken cancellationToken) +{ +try +{ +int i = 0; +while (++i <= messages && !cancellationToken.IsCancellationRequested) +{ +var data = DateTime.UtcNow.ToLong
Re: [PR] Add sample using send channel [pulsar-dotpulsar]
blankensteiner merged PR #215: URL: https://github.com/apache/pulsar-dotpulsar/pull/215 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-2.11 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 2e272b9ae64 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) 2e272b9ae64 is described below commit 2e272b9ae6442aa39987546aed9bfcabbbc20534 Author: Lari Hotari AuthorDate: Mon Apr 22 07:49:34 2024 +0300 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) (cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c) --- conf/broker.conf | 10 ++ conf/functions_worker.yml | 10 ++ conf/proxy.conf| 10 ++ conf/standalone.conf | 10 ++ conf/websocket.conf| 10 ++ pom.xml| 1 + .../apache/pulsar/broker/ServiceConfiguration.java | 16 ++ .../pulsar/broker/web/JettyRequestLogFactory.java | 195 - pulsar-broker/pom.xml | 7 + .../org/apache/pulsar/broker/web/WebService.java | 40 - .../broker/web/WebServiceOriginalClientIPTest.java | 174 ++ pulsar-broker/src/test/resources/log4j2.xml| 5 +- .../pulsar/functions/worker/WorkerConfig.java | 16 ++ .../pulsar/functions/worker/rest/WorkerServer.java | 38 +++- pulsar-proxy/pom.xml | 6 + .../pulsar/proxy/server/ProxyConfiguration.java| 16 ++ .../pulsar/proxy/server/ProxyServiceStarter.java | 31 +++- .../org/apache/pulsar/proxy/server/WebServer.java | 34 +++- .../proxy/server/ProxyOriginalClientIPTest.java| 173 ++ .../ProxyServiceStarterDisableZeroCopyTest.java| 2 +- .../proxy/server/ProxyServiceStarterTest.java | 2 +- .../proxy/server/ProxyServiceTlsStarterTest.java | 2 +- .../src/test/resources/log4j2.xml | 9 +- .../pulsar/websocket/service/ProxyServer.java | 39 - .../service/WebSocketProxyConfiguration.java | 14 ++ 25 files changed, 842 insertions(+), 28 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1b3bdd1f91f..cab021246f5 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -84,6 +84,16 @@ advertisedAddress= # Enable or disable the HAProxy protocol. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Number of threads to config Netty Acceptor. Default is 1 numAcceptorThreads= diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 1a0330eb6f9..662f943ca0d 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -27,6 +27,16 @@ workerHostname: localhost workerPort: 6750 workerPortTls: 6751 +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled: false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor: false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses: null + # The Configuration metadata store url # Examples: # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 diff --git a/conf/proxy.conf b/conf/proxy.conf index a63a61f7ca6..80d6c713dc2 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -60,6 +60,16 @@ advertisedAddress= # Enable or disable the HAProxy protocol. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Enables zero-copy transport of data across network interfaces
[PR] Add sample using send channel [pulsar-dotpulsar]
kandersen82 opened a new pull request, #215: URL: https://github.com/apache/pulsar-dotpulsar/pull/215 Add sample for using send channel when producing messages # Description Add sample for using send channel when producing messages -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7d52dd71331 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) 7d52dd71331 is described below commit 7d52dd7133106a7c39459af15e5ae395218cf6eb Author: Lari Hotari AuthorDate: Mon Apr 22 07:49:34 2024 +0300 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) (cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c) --- conf/broker.conf | 10 ++ conf/functions_worker.yml | 10 ++ conf/proxy.conf| 10 ++ conf/standalone.conf | 10 ++ conf/websocket.conf| 10 ++ pom.xml| 1 + .../apache/pulsar/broker/ServiceConfiguration.java | 16 ++ .../pulsar/broker/web/JettyRequestLogFactory.java | 195 - pulsar-broker/pom.xml | 7 + .../org/apache/pulsar/broker/web/WebService.java | 37 +++- .../broker/web/WebServiceOriginalClientIPTest.java | 155 pulsar-broker/src/test/resources/log4j2.xml| 40 + .../pulsar/functions/worker/WorkerConfig.java | 16 ++ .../pulsar/functions/worker/rest/WorkerServer.java | 38 +++- pulsar-proxy/pom.xml | 6 + .../pulsar/proxy/server/ProxyConfiguration.java| 16 ++ .../pulsar/proxy/server/ProxyServiceStarter.java | 31 +++- .../org/apache/pulsar/proxy/server/WebServer.java | 34 +++- .../proxy/server/ProxyOriginalClientIPTest.java| 157 + .../ProxyServiceStarterDisableZeroCopyTest.java| 2 +- .../proxy/server/ProxyServiceStarterTest.java | 2 +- .../proxy/server/ProxyServiceTlsStarterTest.java | 2 +- pulsar-proxy/src/test/resources/log4j2.xml | 36 .../pulsar/websocket/service/ProxyServer.java | 39 - .../service/WebSocketProxyConfiguration.java | 14 ++ 25 files changed, 873 insertions(+), 21 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 461211bf37f..ec0974dca20 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -87,6 +87,16 @@ advertisedAddress= # Enable or disable the HAProxy protocol. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Number of threads to config Netty Acceptor. Default is 1 numAcceptorThreads= diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 460becd9a94..e1c162c2f25 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -27,6 +27,16 @@ workerHostname: localhost workerPort: 6750 workerPortTls: 6751 +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled: false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor: false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses: null + # The Configuration metadata store url # Examples: # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 diff --git a/conf/proxy.conf b/conf/proxy.conf index ef0836ac1ac..2607900175b 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -62,6 +62,16 @@ advertisedAddress= # Enable or disable the HAProxy protocol. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Enables zero-copy transport of data across network interfaces u
(pulsar) branch branch-3.2 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 28bce5022a6 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) 28bce5022a6 is described below commit 28bce5022a608a6072b9489ee0a2b5d95b36d1ae Author: Lari Hotari AuthorDate: Mon Apr 22 07:49:34 2024 +0300 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) (cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c) --- conf/broker.conf | 10 ++ conf/functions_worker.yml | 10 ++ conf/proxy.conf| 10 ++ conf/standalone.conf | 10 ++ conf/websocket.conf| 10 ++ pom.xml| 1 + .../apache/pulsar/broker/ServiceConfiguration.java | 16 ++ .../pulsar/broker/web/JettyRequestLogFactory.java | 195 - pulsar-broker/pom.xml | 7 + .../org/apache/pulsar/broker/web/WebService.java | 37 +++- .../broker/web/WebServiceOriginalClientIPTest.java | 155 pulsar-broker/src/test/resources/log4j2.xml| 40 + .../pulsar/functions/worker/WorkerConfig.java | 16 ++ .../pulsar/functions/worker/rest/WorkerServer.java | 38 +++- pulsar-proxy/pom.xml | 6 + .../pulsar/proxy/server/ProxyConfiguration.java| 16 ++ .../pulsar/proxy/server/ProxyServiceStarter.java | 31 +++- .../org/apache/pulsar/proxy/server/WebServer.java | 34 +++- .../proxy/server/ProxyOriginalClientIPTest.java| 157 + .../ProxyServiceStarterDisableZeroCopyTest.java| 2 +- .../proxy/server/ProxyServiceStarterTest.java | 2 +- .../proxy/server/ProxyServiceTlsStarterTest.java | 2 +- pulsar-proxy/src/test/resources/log4j2.xml | 36 .../pulsar/websocket/service/ProxyServer.java | 39 - .../service/WebSocketProxyConfiguration.java | 14 ++ 25 files changed, 873 insertions(+), 21 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 4f5c4cac565..2d5221c65c4 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -88,6 +88,16 @@ advertisedAddress= # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Number of threads to config Netty Acceptor. Default is 1 numAcceptorThreads= diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 3871c74a887..6f995576ebd 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -27,6 +27,16 @@ workerHostname: localhost workerPort: 6750 workerPortTls: 6751 +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled: false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor: false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses: null + # The Configuration metadata store url # Examples: # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 diff --git a/conf/proxy.conf b/conf/proxy.conf index 5a9d433f39c..6e6c960e800 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -63,6 +63,16 @@ advertisedAddress= # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or
(pulsar) 10/10: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit e7ff0678e56f605a8fb2344c7162f5dcd7e31072 Author: Rajan Dhabalia AuthorDate: Fri Apr 19 10:30:55 2024 -0700 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535) (cherry picked from commit 59daac64c210f539e733f883edad09d08333aa62) --- .../pulsar/broker/service/AbstractTopic.java | 52 +- ...kerInternalClientConfigurationOverrideTest.java | 42 - 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 39c3cbf13b2..6ddcfc9ef4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -218,13 +218,16 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue( data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString(; - topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize()); - topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds()); + topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize())); + topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds())); topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate())); topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled()); topicPolicies.getReplicatorDispatchRate().updateTopicValue( @@ -263,15 +266,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters))); topicPolicies.getMaxUnackedMessagesOnConsumer() - .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer)); topicPolicies.getMaxUnackedMessagesOnSubscription() - .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription); - topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds); - topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic); - topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic); - topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription)); +topicPolicies.getMessageTTLInSeconds() + .updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds)); +topicPolicies.getMaxSubscriptionsPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic)); +topicPolicies.getMaxProducersPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic)); +topicPolicies.getMaxConsumerPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic)); topicPolicies.getMaxConsumersPerSubscription() - .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription)); topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies); topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue( @@ -301,6 +308,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener producer = pulsarClient.newProducer() +.topic(topic).create(); +PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(), +conf.getMaxUnackedMessagesPerSubscription()); + assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(), +conf.getMaxConsumersPerSubscription()); +
(pulsar) 05/10: [improve] Make the config `metricsBufferResponse` description more effective (#22490)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit f97a0bdb973a1ed4dd13986e84a1ba55459dd4fe Author: 道君 AuthorDate: Wed Apr 17 03:07:30 2024 +0800 [improve] Make the config `metricsBufferResponse` description more effective (#22490) (cherry picked from commit 4ca4e2855267e3b36ee1a27f7144b89ba9194821) --- .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2e7d4d582d6..0fce252694f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2908,8 +2908,10 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean exposeTopicLevelMetricsInPrometheus = true; @FieldContext( category = CATEGORY_METRICS, -doc = "If true, export buffered metrics" -) +doc = "Set to true to enable the broker to cache the metrics response; the default is false. " ++ "The caching period is defined by `managedLedgerStatsPeriodSeconds`. " ++ "The broker returns the same response for subsequent requests within the same period. " ++ "Ensure that the scrape interval of your monitoring system matches the caching period.") private boolean metricsBufferResponse = false; @FieldContext( category = CATEGORY_METRICS,
(pulsar) 03/10: [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 8d0a412eb93ed11dc3fd8cf12ac1d69d1ef25fc5 Author: Mukesh Kumar <65598381+mukesh...@users.noreply.github.com> AuthorDate: Fri Apr 12 22:07:28 2024 +0530 [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488) (cherry picked from commit b85730069ee4c5f96406a075e354d0592fdab434) --- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 29df6e78cd1..a76ac548e0c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3150,14 +3150,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) { if (backlogQuotaType == BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) { -log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(), +log.debug("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy)); } if (backlogQuotaType == BacklogQuotaType.message_age) { return checkTimeBacklogExceeded().thenCompose(isExceeded -> { if (isExceeded) { -log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(), +log.debug("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy)); } else {
(pulsar) branch branch-3.2 updated (14841768466 -> e7ff0678e56)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 14841768466 [improve][offload] Apply autoSkipNonRecoverableData configuration to tiered storage (#22531) new 49bcbb6c3da [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397) new 72d290abd49 [fix][broker] Fix message drop record in producer stat (#22458) new 8d0a412eb93 [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488) new bf89f080744 [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500) new f97a0bdb973 [improve] Make the config `metricsBufferResponse` description more effective (#22490) new 5d474657869 [improve][broker] Add topic name to emitted error messages. (#22506) new becbaf198c5 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) new 50293b75654 [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526) new 91ce98d4eb6 [fix][broker] Fix typos in Consumer class (#22532) new e7ff0678e56 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535) The 10 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../mledger/impl/ShadowManagedLedgerImplTest.java | 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 9 ++- .../broker/admin/impl/PersistentTopicsBase.java| 2 +- .../pulsar/broker/service/AbstractTopic.java | 69 +- .../pulsar/broker/service/BrokerService.java | 4 +- .../org/apache/pulsar/broker/service/Consumer.java | 6 +- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../broker/service/persistent/PersistentTopic.java | 4 +- .../apache/pulsar/broker/admin/AdminApi2Test.java | 6 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 45 ++ ...kerInternalClientConfigurationOverrideTest.java | 42 - .../client/api/SimpleProducerConsumerTest.java | 66 - 12 files changed, 185 insertions(+), 72 deletions(-)
(pulsar) 07/10: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit becbaf198c5d399375c6149d3493391c382232e0 Author: hanmz AuthorDate: Wed Apr 17 18:14:38 2024 +0800 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) (cherry picked from commit 1dd82a0affd6ec3686fa85d444c354e9ce12) --- .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f19b3436f7b..dc66750fd96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -336,7 +336,9 @@ public class BrokerService implements Closeable { this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); - pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +if (pulsar.getConfigurationMetadataStore() != pulsar.getLocalMetadataStore()) { + pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +} this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
(pulsar) 08/10: [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 50293b7565483f5b074ca43d9b6df0836d10cb0e Author: Lari Hotari AuthorDate: Wed Apr 17 12:46:43 2024 -0700 [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526) (cherry picked from commit 56970b714f5adb606b02d12a99db1ceec3fa7832) --- .../org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java index cc4b3f24811..2aa04197ab9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java @@ -51,7 +51,7 @@ public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase { return (ShadowManagedLedgerImpl) shadowML; } -@Test +@Test(groups = "flaky") public void testShadowWrites() throws Exception { ManagedLedgerImpl sourceML = (ManagedLedgerImpl) factory.open("source_ML", new ManagedLedgerConfig() .setMaxEntriesPerLedger(2)
(pulsar) 09/10: [fix][broker] Fix typos in Consumer class (#22532)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 91ce98d4eb63978dc7a4924fc92d327da9dffc67 Author: hanmz AuthorDate: Fri Apr 19 06:49:18 2024 +0800 [fix][broker] Fix typos in Consumer class (#22532) (cherry picked from commit 7aedb6b20c120ec0a7cc096e33e6305caca26786) --- .../src/main/java/org/apache/pulsar/broker/service/Consumer.java| 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 4cd54420200..6b2028095e2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -147,7 +147,7 @@ public class Consumer { @Setter private volatile long consumerEpoch; -private long negtiveUnackedMsgsTimestamp; +private long negativeUnackedMsgsTimestamp; @Getter private final SchemaType schemaType; @@ -1102,8 +1102,8 @@ public class Consumer { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); } -if (unackedMsgs < 0 && System.currentTimeMillis() - negtiveUnackedMsgsTimestamp >= 10_000) { -negtiveUnackedMsgsTimestamp = System.currentTimeMillis(); +if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) { +negativeUnackedMsgsTimestamp = System.currentTimeMillis(); log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer); } return unackedMsgs;
(pulsar) 01/10: [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 49bcbb6c3da2246bd887605fd68c10e1d3e9af28 Author: hanmz AuthorDate: Wed Apr 10 04:27:22 2024 +0800 [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397) (cherry picked from commit fb5caeb2cd3353db0499e32e9ec79390741b809c) --- .../apache/pulsar/broker/ServiceConfiguration.java | 3 +- .../broker/admin/impl/PersistentTopicsBase.java| 2 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 45 ++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 8b99501f296..2e7d4d582d6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1351,7 +1351,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, dynamic = true, doc = "The number of partitions per partitioned topic.\n" -+ "If try to create or update partitioned topics by exceeded number of partitions, then fail." ++ "If try to create or update partitioned topics by exceeded number of partitions, then fail.\n" ++ "Use 0 or negative number to disable the check." ) private int maxNumPartitionsPerPartitionedTopic = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 318e2bc2cde..b0968f494ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -348,7 +348,7 @@ public class PersistentTopicsBase extends AdminResource { } int brokerMaximumPartitionsPerTopic = pulsarService.getConfiguration() .getMaxNumPartitionsPerPartitionedTopic(); -if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > brokerMaximumPartitionsPerTopic) { +if (brokerMaximumPartitionsPerTopic > 0 && expectPartitions > brokerMaximumPartitionsPerTopic) { throw new RestException(422 /* Unprocessable entity*/, String.format("Desired partitions %s can't be greater than the maximum partitions per" + " topic %s.", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 8e1375303ce..c588051a0fe 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1742,6 +1742,51 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); partitionedTopicMetadata = metaCaptor.getValue(); Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + +// test for configuration maxNumPartitionsPerPartitionedTopic +conf.setMaxNumPartitionsPerPartitionedTopic(4); +response = mock(AsyncResponse.class); +throwableCaptor = ArgumentCaptor.forClass(Throwable.class); +persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true, +true, 5); +verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture()); +Assert.assertEquals(throwableCaptor.getValue().getMessage(), +"Desired partitions 5 can't be greater than the maximum partitions per topic 4."); + +response = mock(AsyncResponse.class); +metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); +persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false); +verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); +partitionedTopicMetadata = metaCaptor.getValue(); +Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + +conf.setMaxNumPartitionsPerPartitionedTopic(-1); +response = mock(AsyncResponse.class); +responseCaptor = ArgumentCaptor.forClass(Response.class); +persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false
(pulsar) 06/10: [improve][broker] Add topic name to emitted error messages. (#22506)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 5d4746578695f828e0901cb14fd1e83837fe4b1a Author: 道君 AuthorDate: Wed Apr 17 03:12:34 2024 +0800 [improve][broker] Add topic name to emitted error messages. (#22506) (cherry picked from commit d5b36da9a2e0d4f17bea8e033180e494e93dc442) --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 17 + .../org/apache/pulsar/broker/admin/AdminApi2Test.java | 6 -- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 837f073b00d..39c3cbf13b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -855,7 +855,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener internalAddProducer(Producer producer) { if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); -return CompletableFuture.failedFuture( -new BrokerServiceException.ProducerBusyException("Topic reached max producers limit")); +return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( +"Topic '" + topic + "' reached max producers limit")); } if (isSameAddressProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); -return CompletableFuture.failedFuture( -new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit")); +return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( +"Topic '" + topic + "' reached max same address producers limit")); } if (log.isDebugEnabled()) { @@ -971,7 +971,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener producer = pulsarClient.newProducer().topic(topic).create(); fail("should fail"); } catch (PulsarClientException e) { -assertTrue(e.getMessage().contains("Topic reached max producers limit")); +String expectMsg = "Topic '" + topic + "' reached max producers limit"; +assertTrue(e.getMessage().contains(expectMsg)); } //set the limit to 3 admin.namespaces().setMaxProducersPerTopic(myNamespace, 3); @@ -2901,7 +2902,8 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { Producer producer1 = pulsarClient.newProducer().topic(topic).create(); fail("should fail"); } catch (PulsarClientException e) { -assertTrue(e.getMessage().contains("Topic reached max producers limit")); +String expectMsg = "Topic '" + topic + "' reached max producers limit"; +assertTrue(e.getMessage().contains(expectMsg)); } //clean up
(pulsar) 04/10: [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit bf89f0807446f6e65d9db4cff24cce974e34eb01 Author: sinan liu AuthorDate: Tue Apr 16 21:19:44 2024 +0800 [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500) (cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89) --- .../client/api/SimpleProducerConsumerTest.java | 66 +- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 7552b84a1c5..691f501777e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { public void testAccessAvroSchemaMetadata(Schema schema) throws Exception { log.info("-- Starting {} test --", methodName); +if (pulsarClient == null) { +pulsarClient = newPulsarClient(lookupUrl.toString(), 0); +} + final String topic = "persistent://my-property/my-ns/accessSchema"; Consumer consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) .topic(topic) @@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producer.send(payload); producer.close(); -GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue(); -consumer.close(); -assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType()); -org.apache.avro.generic.GenericRecord nativeAvroRecord = null; -JsonNode nativeJsonRecord = null; -if (schema.getSchemaInfo().getType() == SchemaType.AVRO) { -nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject(); -assertNotNull(nativeAvroRecord); -} else { -nativeJsonRecord = (JsonNode) res.getNativeObject(); -assertNotNull(nativeJsonRecord); -} -for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) { -log.info("field {} {}", f.getName(), res.getField(f)); -assertEquals("field", f.getName()); -assertEquals("a", res.getField(f)); - -if (nativeAvroRecord != null) { -// test that the native schema is accessible -org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName()); -// a nullable string is an UNION -assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType()); - assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING)); - assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL)); +try { +GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue(); +consumer.close(); +assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType()); +org.apache.avro.generic.GenericRecord nativeAvroRecord = null; +JsonNode nativeJsonRecord = null; +if (schema.getSchemaInfo().getType() == SchemaType.AVRO) { +nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject(); +assertNotNull(nativeAvroRecord); } else { -assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType()); +nativeJsonRecord = (JsonNode) res.getNativeObject(); +assertNotNull(nativeJsonRecord); +} +for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) { +log.info("field {} {}", f.getName(), res.getField(f)); +assertEquals("field", f.getName()); +assertEquals("a", res.getField(f)); + +if (nativeAvroRecord != null) { +// test that the native schema is accessible +org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName()); +// a nullable string is an UNION +assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType()); + assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING)); + assertTrue(fieldDetails.schema(
(pulsar) 02/10: [fix][broker] Fix message drop record in producer stat (#22458)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 72d290abd49743bfb94173e831b907e5780b180f Author: zhangqian <503837...@qq.com> AuthorDate: Wed Apr 10 16:51:26 2024 +0800 [fix][broker] Fix message drop record in producer stat (#22458) Co-authored-by: ceceezhang (cherry picked from commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36) --- .../src/main/java/org/apache/pulsar/broker/service/Producer.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 7e4459505a5..9cfde67802b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -749,7 +749,7 @@ public class Producer { } if (this.isNonPersistentTopic) { msgDrop.calculateRate(); -((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getRate(); +((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getValueRate(); } }
(pulsar) branch branch-3.0 updated: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8439082f79c [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535) 8439082f79c is described below commit 8439082f79c5480b58be93fb360ed07b68016631 Author: Rajan Dhabalia AuthorDate: Fri Apr 19 10:30:55 2024 -0700 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535) (cherry picked from commit 59daac64c210f539e733f883edad09d08333aa62) --- .../pulsar/broker/service/AbstractTopic.java | 52 +- ...kerInternalClientConfigurationOverrideTest.java | 42 - 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 5bbc30f7ed0..16aeabbcc75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -216,13 +216,16 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue( data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString(; - topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize()); - topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds()); + topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize())); + topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds())); topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate())); topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled()); topicPolicies.getReplicatorDispatchRate().updateTopicValue( @@ -261,15 +264,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters))); topicPolicies.getMaxUnackedMessagesOnConsumer() - .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer)); topicPolicies.getMaxUnackedMessagesOnSubscription() - .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription); - topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds); - topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic); - topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic); - topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription)); +topicPolicies.getMessageTTLInSeconds() + .updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds)); +topicPolicies.getMaxSubscriptionsPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic)); +topicPolicies.getMaxProducersPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic)); +topicPolicies.getMaxConsumerPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic)); topicPolicies.getMaxConsumersPerSubscription() - .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription)); topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies); topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue( @@ -299,6 +306,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener producer = pulsarClient.newProducer() +.topic(topic).create(); +PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubs
(pulsar) 03/04: [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 155f3cbf9f63993a4b6e84b107d17b556927b7c7 Author: Lari Hotari AuthorDate: Wed Apr 17 12:46:43 2024 -0700 [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526) (cherry picked from commit 56970b714f5adb606b02d12a99db1ceec3fa7832) --- .../org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java index cc4b3f24811..2aa04197ab9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java @@ -51,7 +51,7 @@ public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase { return (ShadowManagedLedgerImpl) shadowML; } -@Test +@Test(groups = "flaky") public void testShadowWrites() throws Exception { ManagedLedgerImpl sourceML = (ManagedLedgerImpl) factory.open("source_ML", new ManagedLedgerConfig() .setMaxEntriesPerLedger(2)
(pulsar) 04/04: [fix][broker] Fix typos in Consumer class (#22532)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit beb147c701728f9a2c0fcc9b56d4efb53f94a9bc Author: hanmz AuthorDate: Fri Apr 19 06:49:18 2024 +0800 [fix][broker] Fix typos in Consumer class (#22532) (cherry picked from commit 7aedb6b20c120ec0a7cc096e33e6305caca26786) --- .../src/main/java/org/apache/pulsar/broker/service/Consumer.java| 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index d8ed99bb874..a98fc86c03e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -146,7 +146,7 @@ public class Consumer { @Setter private volatile long consumerEpoch; -private long negtiveUnackedMsgsTimestamp; +private long negativeUnackedMsgsTimestamp; @Getter private final SchemaType schemaType; @@ -1086,8 +1086,8 @@ public class Consumer { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); } -if (unackedMsgs < 0 && System.currentTimeMillis() - negtiveUnackedMsgsTimestamp >= 10_000) { -negtiveUnackedMsgsTimestamp = System.currentTimeMillis(); +if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) { +negativeUnackedMsgsTimestamp = System.currentTimeMillis(); log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer); } return unackedMsgs;
(pulsar) branch branch-3.0 updated (94f12543a9f -> beb147c7017)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 94f12543a9f [improve] Make the config `metricsBufferResponse` description more effective (#22490) new 1d202f93c81 [improve][broker] Add topic name to emitted error messages. (#22506) new 8f5b825b7a9 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) new 155f3cbf9f6 [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526) new beb147c7017 [fix][broker] Fix typos in Consumer class (#22532) The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../mledger/impl/ShadowManagedLedgerImplTest.java | 2 +- .../org/apache/pulsar/broker/service/AbstractTopic.java | 15 --- .../org/apache/pulsar/broker/service/BrokerService.java | 4 +++- .../java/org/apache/pulsar/broker/service/Consumer.java | 6 +++--- .../org/apache/pulsar/broker/admin/AdminApi2Test.java | 6 -- 5 files changed, 19 insertions(+), 14 deletions(-)
(pulsar) 01/04: [improve][broker] Add topic name to emitted error messages. (#22506)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 1d202f93c8108eb8e293c70df9a932292329e822 Author: 道君 AuthorDate: Wed Apr 17 03:12:34 2024 +0800 [improve][broker] Add topic name to emitted error messages. (#22506) (cherry picked from commit d5b36da9a2e0d4f17bea8e033180e494e93dc442) --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 15 --- .../org/apache/pulsar/broker/admin/AdminApi2Test.java | 6 -- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 507a5af5270..5bbc30f7ed0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -863,7 +863,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener internalAddProducer(Producer producer) { if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); -return CompletableFuture.failedFuture( -new BrokerServiceException.ProducerBusyException("Topic reached max producers limit")); +return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( +"Topic '" + topic + "' reached max producers limit")); } if (isSameAddressProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); -return CompletableFuture.failedFuture( -new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit")); +return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( +"Topic '" + topic + "' reached max same address producers limit")); } if (log.isDebugEnabled()) { @@ -1001,7 +1001,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener producer = pulsarClient.newProducer().topic(topic).create(); fail("should fail"); } catch (PulsarClientException e) { -assertTrue(e.getMessage().contains("Topic reached max producers limit")); +String expectMsg = "Topic '" + topic + "' reached max producers limit"; +assertTrue(e.getMessage().contains(expectMsg)); } //set the limit to 3 admin.namespaces().setMaxProducersPerTopic(myNamespace, 3); @@ -2900,7 +2901,8 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { Producer producer1 = pulsarClient.newProducer().topic(topic).create(); fail("should fail"); } catch (PulsarClientException e) { -assertTrue(e.getMessage().contains("Topic reached max producers limit")); +String expectMsg = "Topic '" + topic + "' reached max producers limit"; +assertTrue(e.getMessage().contains(expectMsg)); } //clean up
(pulsar) 02/04: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 8f5b825b7a979acf80ad88ad2a939357d3f1a970 Author: hanmz AuthorDate: Wed Apr 17 18:14:38 2024 +0800 [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519) (cherry picked from commit 1dd82a0affd6ec3686fa85d444c354e9ce12) --- .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 20de00370e1..fdc9e5b33fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -336,7 +336,9 @@ public class BrokerService implements Closeable { this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); - pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +if (pulsar.getConfigurationMetadataStore() != pulsar.getLocalMetadataStore()) { + pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); +} this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
(pulsar) 06/06: [improve] Make the config `metricsBufferResponse` description more effective (#22490)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 94f12543a9ffe7e96f9af7ef9108d1b849ca3280 Author: 道君 AuthorDate: Wed Apr 17 03:07:30 2024 +0800 [improve] Make the config `metricsBufferResponse` description more effective (#22490) (cherry picked from commit 4ca4e2855267e3b36ee1a27f7144b89ba9194821) --- .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ae6928d2b32..0dc5d38c191 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2852,8 +2852,10 @@ public class ServiceConfiguration implements PulsarConfiguration { private boolean exposeTopicLevelMetricsInPrometheus = true; @FieldContext( category = CATEGORY_METRICS, -doc = "If true, export buffered metrics" -) +doc = "Set to true to enable the broker to cache the metrics response; the default is false. " ++ "The caching period is defined by `managedLedgerStatsPeriodSeconds`. " ++ "The broker returns the same response for subsequent requests within the same period. " ++ "Ensure that the scrape interval of your monitoring system matches the caching period.") private boolean metricsBufferResponse = false; @FieldContext( category = CATEGORY_METRICS,
(pulsar) 01/06: [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit a9e815d464ca8d4620ba6d676d74831a730d533c Author: Lari Hotari AuthorDate: Thu Apr 4 06:39:53 2024 -0700 [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425) (cherry picked from commit 5b6f91bc0f839c467bdc1af35c8eac7b14aa8822) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 50e96b4d35f..0f561597f19 100644 --- a/pom.xml +++ b/pom.xml @@ -228,7 +228,7 @@ flexible messaging model and an intuitive client API. 0.9.1 2.1.0 3.24.2 -1.18.30 +1.18.32 1.3.2 2.3.1 1.2.0
(pulsar) 04/06: [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit eb1b55ec6815375d751ff764ebe29455101a3b61 Author: Mukesh Kumar <65598381+mukesh...@users.noreply.github.com> AuthorDate: Fri Apr 12 22:07:28 2024 +0530 [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488) (cherry picked from commit b85730069ee4c5f96406a075e354d0592fdab434) --- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1363ca0945d..85fdc182ccc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2925,14 +2925,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) { if (backlogQuotaType == BacklogQuotaType.destination_storage && isSizeBacklogExceeded()) { -log.info("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(), +log.debug("[{}] Size backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy)); } if (backlogQuotaType == BacklogQuotaType.message_age) { return checkTimeBacklogExceeded().thenCompose(isExceeded -> { if (isExceeded) { -log.info("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(), +log.debug("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy)); } else {
(pulsar) 05/06: [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 42ae91ae31bc1ca27a13deb4b48a90ac06c89388 Author: sinan liu AuthorDate: Tue Apr 16 21:19:44 2024 +0800 [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500) (cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89) --- .../client/api/SimpleProducerConsumerTest.java | 66 +- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index e76d6d6962c..68158dd69a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { public void testAccessAvroSchemaMetadata(Schema schema) throws Exception { log.info("-- Starting {} test --", methodName); +if (pulsarClient == null) { +pulsarClient = newPulsarClient(lookupUrl.toString(), 0); +} + final String topic = "persistent://my-property/my-ns/accessSchema"; Consumer consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) .topic(topic) @@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { producer.send(payload); producer.close(); -GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue(); -consumer.close(); -assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType()); -org.apache.avro.generic.GenericRecord nativeAvroRecord = null; -JsonNode nativeJsonRecord = null; -if (schema.getSchemaInfo().getType() == SchemaType.AVRO) { -nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject(); -assertNotNull(nativeAvroRecord); -} else { -nativeJsonRecord = (JsonNode) res.getNativeObject(); -assertNotNull(nativeJsonRecord); -} -for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) { -log.info("field {} {}", f.getName(), res.getField(f)); -assertEquals("field", f.getName()); -assertEquals("a", res.getField(f)); - -if (nativeAvroRecord != null) { -// test that the native schema is accessible -org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName()); -// a nullable string is an UNION -assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType()); - assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING)); - assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.NULL)); +try { +GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS).getValue(); +consumer.close(); +assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType()); +org.apache.avro.generic.GenericRecord nativeAvroRecord = null; +JsonNode nativeJsonRecord = null; +if (schema.getSchemaInfo().getType() == SchemaType.AVRO) { +nativeAvroRecord = (org.apache.avro.generic.GenericRecord) res.getNativeObject(); +assertNotNull(nativeAvroRecord); } else { -assertEquals(JsonNodeType.STRING, nativeJsonRecord.get("field").getNodeType()); +nativeJsonRecord = (JsonNode) res.getNativeObject(); +assertNotNull(nativeJsonRecord); +} +for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) { +log.info("field {} {}", f.getName(), res.getField(f)); +assertEquals("field", f.getName()); +assertEquals("a", res.getField(f)); + +if (nativeAvroRecord != null) { +// test that the native schema is accessible +org.apache.avro.Schema.Field fieldDetails = nativeAvroRecord.getSchema().getField(f.getName()); +// a nullable string is an UNION +assertEquals(org.apache.avro.Schema.Type.UNION, fieldDetails.schema().getType()); + assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() == org.apache.avro.Schema.Type.STRING)); + assertTrue(fieldDetails.schema(
(pulsar) 03/06: [fix][broker] Fix message drop record in producer stat (#22458)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit b41e7527158da8e12dcd491dca4698c4a74d07ba Author: zhangqian <503837...@qq.com> AuthorDate: Wed Apr 10 16:51:26 2024 +0800 [fix][broker] Fix message drop record in producer stat (#22458) Co-authored-by: ceceezhang (cherry picked from commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36) --- .../src/main/java/org/apache/pulsar/broker/service/Producer.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 53b79f06e8e..b077ae17a45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -741,7 +741,7 @@ public class Producer { } if (this.isNonPersistentTopic) { msgDrop.calculateRate(); -((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getRate(); +((NonPersistentPublisherStatsImpl) stats).msgDropRate = msgDrop.getValueRate(); } }
(pulsar) 02/06: [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 386f6f0bf73d01f7b23160e9e4642f4f7c01cac5 Author: hanmz AuthorDate: Wed Apr 10 04:27:22 2024 +0800 [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397) (cherry picked from commit fb5caeb2cd3353db0499e32e9ec79390741b809c) --- .../apache/pulsar/broker/ServiceConfiguration.java | 3 +- .../broker/admin/impl/PersistentTopicsBase.java| 2 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 45 ++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c674a035653..ae6928d2b32 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1348,7 +1348,8 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, dynamic = true, doc = "The number of partitions per partitioned topic.\n" -+ "If try to create or update partitioned topics by exceeded number of partitions, then fail." ++ "If try to create or update partitioned topics by exceeded number of partitions, then fail.\n" ++ "Use 0 or negative number to disable the check." ) private int maxNumPartitionsPerPartitionedTopic = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 602cd47e595..dac3e7cd55d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -433,7 +433,7 @@ public class PersistentTopicsBase extends AdminResource { } int brokerMaximumPartitionsPerTopic = pulsarService.getConfiguration() .getMaxNumPartitionsPerPartitionedTopic(); -if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > brokerMaximumPartitionsPerTopic) { +if (brokerMaximumPartitionsPerTopic > 0 && expectPartitions > brokerMaximumPartitionsPerTopic) { throw new RestException(422 /* Unprocessable entity*/, String.format("Expect partitions %s grater than maximum partitions per topic %s", expectPartitions, brokerMaximumPartitionsPerTopic)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index d7ffa656bdb..0eddbf1fea1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -1658,6 +1658,51 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); partitionedTopicMetadata = metaCaptor.getValue(); Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + +// test for configuration maxNumPartitionsPerPartitionedTopic +conf.setMaxNumPartitionsPerPartitionedTopic(4); +response = mock(AsyncResponse.class); +throwableCaptor = ArgumentCaptor.forClass(Throwable.class); +persistentTopics.updatePartitionedTopic(response, testTenant, testNamespaceLocal, topicName, false, true, +true, 5); +verify(response, timeout(5000).times(1)).resume(throwableCaptor.capture()); +Assert.assertEquals(throwableCaptor.getValue().getMessage(), +"Desired partitions 5 can't be greater than the maximum partitions per topic 4."); + +response = mock(AsyncResponse.class); +metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class); +persistentTopics.getPartitionedMetadata(response, testTenant, testNamespaceLocal, topicName, true, false); +verify(response, timeout(5000).times(1)).resume(metaCaptor.capture()); +partitionedTopicMetadata = metaCaptor.getValue(); +Assert.assertEquals(partitionedTopicMetadata.partitions, 4); + +conf.setMaxNumPartitionsPerPartitionedTopic(-1); +response = mock(AsyncResponse.class); +responseCaptor = ArgumentCaptor.forClass(Response.class); +persistentTopics.updatePartitionedTopic(response, testTenant, testNamespa
(pulsar) branch branch-3.0 updated (ff8d3b73437 -> 94f12543a9f)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from ff8d3b73437 [improve][offload] Apply autoSkipNonRecoverableData configuration to tiered storage (#22531) new a9e815d464c [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425) new 386f6f0bf73 [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397) new b41e7527158 [fix][broker] Fix message drop record in producer stat (#22458) new eb1b55ec681 [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488) new 42ae91ae31b [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500) new 94f12543a9f [improve] Make the config `metricsBufferResponse` description more effective (#22490) The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: pom.xml| 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 9 ++- .../broker/admin/impl/PersistentTopicsBase.java| 2 +- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../broker/service/persistent/PersistentTopic.java | 4 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 45 +++ .../client/api/SimpleProducerConsumerTest.java | 66 +- 7 files changed, 94 insertions(+), 36 deletions(-)
(pulsar) branch master updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4a887217d83 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) 4a887217d83 is described below commit 4a887217d835629cafb393ddf331441b484d4e2c Author: Lari Hotari AuthorDate: Mon Apr 22 07:49:34 2024 +0300 [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524) --- conf/broker.conf | 10 ++ conf/functions_worker.yml | 10 ++ conf/proxy.conf| 10 ++ conf/standalone.conf | 10 ++ conf/websocket.conf| 10 ++ pom.xml| 1 + .../apache/pulsar/broker/ServiceConfiguration.java | 16 ++ .../pulsar/broker/web/JettyRequestLogFactory.java | 195 - pulsar-broker/pom.xml | 7 + .../org/apache/pulsar/broker/web/WebService.java | 37 +++- .../broker/web/WebServiceOriginalClientIPTest.java | 155 pulsar-broker/src/test/resources/log4j2.xml| 3 +- .../pulsar/functions/worker/WorkerConfig.java | 16 ++ .../pulsar/functions/worker/rest/WorkerServer.java | 38 +++- pulsar-proxy/pom.xml | 6 + .../pulsar/proxy/server/ProxyConfiguration.java| 16 ++ .../pulsar/proxy/server/ProxyServiceStarter.java | 31 +++- .../org/apache/pulsar/proxy/server/WebServer.java | 34 +++- .../proxy/server/ProxyOriginalClientIPTest.java| 157 + .../ProxyServiceStarterDisableZeroCopyTest.java| 2 +- .../proxy/server/ProxyServiceStarterTest.java | 2 +- .../proxy/server/ProxyServiceTlsStarterTest.java | 2 +- .../src/test/resources/log4j2.xml | 7 +- .../pulsar/websocket/service/ProxyServer.java | 39 - .../service/WebSocketProxyConfiguration.java | 14 ++ 25 files changed, 801 insertions(+), 27 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index fd6bba0f45d..d482f77da7c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -88,6 +88,16 @@ advertisedAddress= # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Number of threads to config Netty Acceptor. Default is 1 numAcceptorThreads= diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 3871c74a887..6f995576ebd 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -27,6 +27,16 @@ workerHostname: localhost workerPort: 6750 workerPortTls: 6751 +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled: false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor: false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses: null + # The Configuration metadata store url # Examples: # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 diff --git a/conf/proxy.conf b/conf/proxy.conf index 5a9d433f39c..6e6c960e800 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -63,6 +63,16 @@ advertisedAddress= # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data. haProxyProtocolEnabled=false +# Enable or disable the use of HA proxy protocol for resolving the client IP for http/https requests. +webServiceHaProxyProtocolEnabled=false + +# Trust X-Forwarded-For header for resolving the client IP for http/https requests. Default is false. +webServiceTrustXForwardedFor=false + +# Add detailed client/remote and server/local addresses and ports to http/https request logging. +# Defaults to true when either webServiceHaProxyProtocolEnabled or webServiceTrustXForwardedFor is enabled. +webServiceLogDetailedAddresses= + # Enables zero
Re: [PR] [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests [pulsar]
lhotari merged PR #22524: URL: https://github.com/apache/pulsar/pull/22524 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] Update Oxia client to 0.1.6 [pulsar]
nodece commented on PR #22525: URL: https://github.com/apache/pulsar/pull/22525#issuecomment-2068395912 ``` LockManagerTest.acquireLocks:66 » NoClassDefFound Could not initialize class io.streamnative.oxia.proto.ListResponse ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
gaoran10 commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574055874 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, check
Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]
GitHub user wallacepeng edited a discussion: broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly 2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck] [null] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger","reqId":3493733038286036253, "remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650", "local":"/172.20.203.179:58124"} 2024-04-21 03:14:20.474 2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, L:/172.20.203.179:58124 - R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger this is caused by that internal healthcheck cannot create producer for topic (it seems have null producerName) persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck . it reports No such ledger exists on Metadata Server . All the others work except the healthcheck . i have bookkeeper clusters, i decommission the default one then it got this error . does it still try to create ledger on decommisisoned bookie ? ``` bin/pulsar-admin topics stats-internal persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck No such ledger exists on Metadata Server Reason: No such ledger exists on Metadata Server https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";> GitHub link: https://github.com/apache/pulsar/discussions/22545 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]
GitHub user wallacepeng edited a discussion: broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly 2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck] [null] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger","reqId":3493733038286036253, "remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650", "local":"/172.20.203.179:58124"} 2024-04-21 03:14:20.474 2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, L:/172.20.203.179:58124 - R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger this is caused by that internal healthcheck cannot create producer for topic persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck . it reports No such ledger exists on Metadata Server . All the others work except the healthcheck . i have bookkeeper clusters, i decommission the default one then it got this error . does it still try to create ledger on decommisisoned bookie ? ``` bin/pulsar-admin topics stats-internal persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck No such ledger exists on Metadata Server Reason: No such ledger exists on Metadata Server https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";> GitHub link: https://github.com/apache/pulsar/discussions/22545 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]
GitHub user wallacepeng edited a discussion: broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly 2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck] [null] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger","reqId":3493733038286036253, "remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650", "local":"/172.20.203.179:58124"} 2024-04-21 03:14:20.474 2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, L:/172.20.203.179:58124 - R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger caused by org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such ledger exists on Metadata Server - ledger=3596046 - operation=Failed to open ledger this is caused by that internal healthcheck cannot create producer for topic persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck . it reports No such ledger exists on Metadata Server . i have bookkeeper clusters, i decommission the default one then it got this error . does it still try to create ledger on decommisisoned bookie ? ``` bin/pulsar-admin topics stats-internal persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck No such ledger exists on Metadata Server Reason: No such ledger exists on Metadata Server https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";> GitHub link: https://github.com/apache/pulsar/discussions/22545 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve]Improve batching message doc [pulsar-site]
dao-jun commented on code in PR #878: URL: https://github.com/apache/pulsar-site/pull/878#discussion_r1573879880 ## docs/concepts-messaging.md: ## @@ -427,6 +427,13 @@ Consumer consumer = pulsarClient.newConsumer() .subscribe(); ``` +:::note + +Send messages by synchronous API `send` will disable batching, and the message will be sent individually. Review Comment: Yes I know it, it's just that my statement is not accurate. How about change it to: ```txt Send messages by synchronous API `send` will trigger the batch to be sent immediately, even the batch is not full. It is for the purpose of reducing the latency of sending messages and preventing blocking of the caller's thread. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: No, it breaks the order of consumption on the remote cluster if they are using Key_Shared mode, letting users determine how to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: No, it breaks the order of consumption on the remote cluster if they are using Key_Shared mode, just ask users to determine how to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573842742 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +admin2.topics().createPartitionedTopic(topicName, 3); +admin1.topics().createPartitionedTopic(topicName, 2); +try { +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +fail("Expected error due to a conflict partitioned topic already exists."); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("with different partitions")); +} +// Check nothing changed. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 3); +assertEquals(admin1.topics().getReplicationClusters(topicName, true).size(), 1); +// cleanup. +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +/** + * TODO next PR will correct the behavior below, just left this test here. + */ +// @Test +private void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [improve][test] Add topic policy test for topic API [pulsar]
Technoboy- opened a new pull request, #22546: URL: https://github.com/apache/pulsar/pull/22546 ### Motivation Add test for topic policies. ### Modification - Add test for topic policy. - Separate transaction and schema test to a new class ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Consumer able to receive message which is not matching the regex pattern [pulsar]
visortelle commented on issue #22529: URL: https://github.com/apache/pulsar/issues/22529#issuecomment-2067943011 Interesting. My observation is that after you create a pattern consumer, for non-persistent topics it doesn't "immediately" create the underlying subscription and consumers if there are no connected producers at this moment. But it will eventually be created after a short time. TIP: you can display the list of the underlying consumers by casting your consumer to `PatternMultiTopicsConsumerImpl` and calling the `.getConsumers()` method. ```java List> consumers = ((PatternMultiTopicsConsumerImpl) allTopicsConsumer).getConsumers(); for (ConsumerImpl consumer : consumers) { System.out.println("consumer: " + consumer.getTopic()); } ``` If you modify your code to send a lot of messages asynchronously, you'll start to receive them after a short time. ```java for (int i = 0; i < 10; i++) { producer.sendAsync("=from topic non-persistent://tenant-1/name/topic-1 " + i); } ``` ``` ... Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10732 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10733 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10734 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10735 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10736 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10737 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10738 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10739 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10740 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10741 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10742 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10743 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10744 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10745 Received message from topic non-persistent://tenant-1/name/topic-1: =from topic non-persistent://tenant-1/name/topic-1 10746 ``` I don't know if it can qualify as a bug. cc @lhotari I wouldn't rely on non-persistent topics if losing a non-significant amount of messages could affect my application. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org