Re: [PR] [improve][client] Add connectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder [pulsar]

2024-04-21 Thread via GitHub


lhotari commented on code in PR #22541:
URL: https://github.com/apache/pulsar/pull/22541#discussion_r1574216061


##
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java:
##
@@ -47,6 +47,7 @@ public PulsarAdmin build() throws PulsarClientException {
 
 public PulsarAdminBuilderImpl() {
 this.conf = new ClientConfigurationData();
+this.conf.setConnectionsPerBroker(16);

Review Comment:
   > Couldn't the default be part of `ClientConfigurationData` constructor?
   
   Not really. ClientConfigurationData is designed for PulsarClient, but it's 
also used in PulsarAdmin client. The current PulsarAdminBuilderImpl is a bit of 
a hack around ClientConfigurationData.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-dotpulsar) branch master updated: Add sample using send channel (#215)

2024-04-21 Thread blankensteiner
This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7a6d825  Add sample using send channel (#215)
7a6d825 is described below

commit 7a6d825da5c31f662a1db35846143c0606bd824f
Author: Kristian Andersen 
AuthorDate: Mon Apr 22 08:45:09 2024 +0200

Add sample using send channel (#215)
---
 DotPulsar.sln  |  7 +++
 samples/SendChannel/Program.cs | 82 ++
 samples/SendChannel/SendChannel.csproj | 12 +
 3 files changed, 101 insertions(+)

diff --git a/DotPulsar.sln b/DotPulsar.sln
index 1706858..ed8376d 100644
--- a/DotPulsar.sln
+++ b/DotPulsar.sln
@@ -30,6 +30,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = 
"Compression", "benchmarks\C
 EndProject
 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Benchmarks", 
"Benchmarks", "{2C57AF4B-0D23-42D7-86FE-80277FD52875}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SendChannel", 
"samples\SendChannel\SendChannel.csproj", 
"{366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}"
+EndProject
 Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -64,6 +66,10 @@ Global
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
{040F8253-074D-4977-BDB1-0D9798B52CE2}.Release|Any CPU.Build.0 
= Release|Any CPU
+   {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
+   {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
+   {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
+   {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2}.Release|Any CPU.Build.0 
= Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -75,6 +81,7 @@ Global
{6D44683B-865C-4D15-9F0A-1A8441354589} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
{040F8253-074D-4977-BDB1-0D9798B52CE2} = 
{2C57AF4B-0D23-42D7-86FE-80277FD52875}
+   {366ACA6A-7DA2-4E51-AC27-7B570DFFE5D2} = 
{E7106D0F-B255-4631-9FB8-734FC5748FA9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {88355922-E70A-4B73-B7F8-ABF8F2B59789}
diff --git a/samples/SendChannel/Program.cs b/samples/SendChannel/Program.cs
new file mode 100644
index 000..d79f038
--- /dev/null
+++ b/samples/SendChannel/Program.cs
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace SendChannel;
+
+using DotPulsar;
+using DotPulsar.Abstractions;
+using DotPulsar.Extensions;
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+internal static class Program
+{
+private static async Task Main()
+{
+var cts = new CancellationTokenSource();
+
+Console.CancelKeyPress += (sender, args) =>
+{
+cts.Cancel();
+args.Cancel = true;
+};
+
+await using var client = PulsarClient.Builder().Build(); // Connecting 
to pulsar://localhost:6650
+
+await using var producer = client.NewProducer(Schema.String)
+.StateChangedHandler(Monitor)
+.Topic("persistent://public/default/mytopic")
+.Create();
+
+Console.WriteLine("Press Ctrl+C to exit");
+
+var sendChannel = producer.SendChannel;
+await ProduceMessages(sendChannel, 1000, cts.Token);
+sendChannel.Complete();
+
+var shutdownCts = new CancellationTokenSource();
+shutdownCts.CancelAfter(TimeSpan.FromSeconds(30));
+await sendChannel.Completion(shutdownCts.Token);
+}
+
+private static async Task ProduceMessages(ISendChannel 
sendChannel, int messages, CancellationToken cancellationToken)
+{
+try
+{
+int i = 0;
+while (++i <= messages && 
!cancellationToken.IsCancellationRequested)
+{
+var data = DateTime.UtcNow.ToLong

Re: [PR] Add sample using send channel [pulsar-dotpulsar]

2024-04-21 Thread via GitHub


blankensteiner merged PR #215:
URL: https://github.com/apache/pulsar-dotpulsar/pull/215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-2.11 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 2e272b9ae64 [improve][broker] Support X-Forwarded-For and HA Proxy 
Protocol for resolving original client IP of http/https requests (#22524)
2e272b9ae64 is described below

commit 2e272b9ae6442aa39987546aed9bfcabbbc20534
Author: Lari Hotari 
AuthorDate: Mon Apr 22 07:49:34 2024 +0300

[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for 
resolving original client IP of http/https requests (#22524)

(cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c)
---
 conf/broker.conf   |  10 ++
 conf/functions_worker.yml  |  10 ++
 conf/proxy.conf|  10 ++
 conf/standalone.conf   |  10 ++
 conf/websocket.conf|  10 ++
 pom.xml|   1 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../pulsar/broker/web/JettyRequestLogFactory.java  | 195 -
 pulsar-broker/pom.xml  |   7 +
 .../org/apache/pulsar/broker/web/WebService.java   |  40 -
 .../broker/web/WebServiceOriginalClientIPTest.java | 174 ++
 pulsar-broker/src/test/resources/log4j2.xml|   5 +-
 .../pulsar/functions/worker/WorkerConfig.java  |  16 ++
 .../pulsar/functions/worker/rest/WorkerServer.java |  38 +++-
 pulsar-proxy/pom.xml   |   6 +
 .../pulsar/proxy/server/ProxyConfiguration.java|  16 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  31 +++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  34 +++-
 .../proxy/server/ProxyOriginalClientIPTest.java| 173 ++
 .../ProxyServiceStarterDisableZeroCopyTest.java|   2 +-
 .../proxy/server/ProxyServiceStarterTest.java  |   2 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   2 +-
 .../src/test/resources/log4j2.xml  |   9 +-
 .../pulsar/websocket/service/ProxyServer.java  |  39 -
 .../service/WebSocketProxyConfiguration.java   |  14 ++
 25 files changed, 842 insertions(+), 28 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1b3bdd1f91f..cab021246f5 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -84,6 +84,16 @@ advertisedAddress=
 # Enable or disable the HAProxy protocol.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Number of threads to config Netty Acceptor. Default is 1
 numAcceptorThreads=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 1a0330eb6f9..662f943ca0d 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
 workerPort: 6750
 workerPortTls: 6751
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
 # The Configuration metadata store url
 # Examples:
 # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index a63a61f7ca6..80d6c713dc2 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -60,6 +60,16 @@ advertisedAddress=
 # Enable or disable the HAProxy protocol.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Enables zero-copy transport of data across network interfaces 

[PR] Add sample using send channel [pulsar-dotpulsar]

2024-04-21 Thread via GitHub


kandersen82 opened a new pull request, #215:
URL: https://github.com/apache/pulsar-dotpulsar/pull/215

   Add sample for using send channel when producing messages
   
   # Description
   
   Add sample for using send channel when producing messages
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.0 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 7d52dd71331 [improve][broker] Support X-Forwarded-For and HA Proxy 
Protocol for resolving original client IP of http/https requests (#22524)
7d52dd71331 is described below

commit 7d52dd7133106a7c39459af15e5ae395218cf6eb
Author: Lari Hotari 
AuthorDate: Mon Apr 22 07:49:34 2024 +0300

[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for 
resolving original client IP of http/https requests (#22524)

(cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c)
---
 conf/broker.conf   |  10 ++
 conf/functions_worker.yml  |  10 ++
 conf/proxy.conf|  10 ++
 conf/standalone.conf   |  10 ++
 conf/websocket.conf|  10 ++
 pom.xml|   1 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../pulsar/broker/web/JettyRequestLogFactory.java  | 195 -
 pulsar-broker/pom.xml  |   7 +
 .../org/apache/pulsar/broker/web/WebService.java   |  37 +++-
 .../broker/web/WebServiceOriginalClientIPTest.java | 155 
 pulsar-broker/src/test/resources/log4j2.xml|  40 +
 .../pulsar/functions/worker/WorkerConfig.java  |  16 ++
 .../pulsar/functions/worker/rest/WorkerServer.java |  38 +++-
 pulsar-proxy/pom.xml   |   6 +
 .../pulsar/proxy/server/ProxyConfiguration.java|  16 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  31 +++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  34 +++-
 .../proxy/server/ProxyOriginalClientIPTest.java| 157 +
 .../ProxyServiceStarterDisableZeroCopyTest.java|   2 +-
 .../proxy/server/ProxyServiceStarterTest.java  |   2 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   2 +-
 pulsar-proxy/src/test/resources/log4j2.xml |  36 
 .../pulsar/websocket/service/ProxyServer.java  |  39 -
 .../service/WebSocketProxyConfiguration.java   |  14 ++
 25 files changed, 873 insertions(+), 21 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 461211bf37f..ec0974dca20 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -87,6 +87,16 @@ advertisedAddress=
 # Enable or disable the HAProxy protocol.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Number of threads to config Netty Acceptor. Default is 1
 numAcceptorThreads=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 460becd9a94..e1c162c2f25 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
 workerPort: 6750
 workerPortTls: 6751
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
 # The Configuration metadata store url
 # Examples:
 # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index ef0836ac1ac..2607900175b 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -62,6 +62,16 @@ advertisedAddress=
 # Enable or disable the HAProxy protocol.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Enables zero-copy transport of data across network interfaces u

(pulsar) branch branch-3.2 updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 28bce5022a6 [improve][broker] Support X-Forwarded-For and HA Proxy 
Protocol for resolving original client IP of http/https requests (#22524)
28bce5022a6 is described below

commit 28bce5022a608a6072b9489ee0a2b5d95b36d1ae
Author: Lari Hotari 
AuthorDate: Mon Apr 22 07:49:34 2024 +0300

[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for 
resolving original client IP of http/https requests (#22524)

(cherry picked from commit 4a887217d835629cafb393ddf331441b484d4e2c)
---
 conf/broker.conf   |  10 ++
 conf/functions_worker.yml  |  10 ++
 conf/proxy.conf|  10 ++
 conf/standalone.conf   |  10 ++
 conf/websocket.conf|  10 ++
 pom.xml|   1 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../pulsar/broker/web/JettyRequestLogFactory.java  | 195 -
 pulsar-broker/pom.xml  |   7 +
 .../org/apache/pulsar/broker/web/WebService.java   |  37 +++-
 .../broker/web/WebServiceOriginalClientIPTest.java | 155 
 pulsar-broker/src/test/resources/log4j2.xml|  40 +
 .../pulsar/functions/worker/WorkerConfig.java  |  16 ++
 .../pulsar/functions/worker/rest/WorkerServer.java |  38 +++-
 pulsar-proxy/pom.xml   |   6 +
 .../pulsar/proxy/server/ProxyConfiguration.java|  16 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  31 +++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  34 +++-
 .../proxy/server/ProxyOriginalClientIPTest.java| 157 +
 .../ProxyServiceStarterDisableZeroCopyTest.java|   2 +-
 .../proxy/server/ProxyServiceStarterTest.java  |   2 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   2 +-
 pulsar-proxy/src/test/resources/log4j2.xml |  36 
 .../pulsar/websocket/service/ProxyServer.java  |  39 -
 .../service/WebSocketProxyConfiguration.java   |  14 ++
 25 files changed, 873 insertions(+), 21 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 4f5c4cac565..2d5221c65c4 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Number of threads to config Netty Acceptor. Default is 1
 numAcceptorThreads=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 3871c74a887..6f995576ebd 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
 workerPort: 6750
 workerPortTls: 6751
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
 # The Configuration metadata store url
 # Examples:
 # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5a9d433f39c..6e6c960e800 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 

(pulsar) 10/10: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e7ff0678e56f605a8fb2344c7162f5dcd7e31072
Author: Rajan Dhabalia 
AuthorDate: Fri Apr 19 10:30:55 2024 -0700

[fix][broker] Fix broken topic policy implementation compatibility with old 
pulsar version (#22535)

(cherry picked from commit 59daac64c210f539e733f883edad09d08333aa62)
---
 .../pulsar/broker/service/AbstractTopic.java   | 52 +-
 ...kerInternalClientConfigurationOverrideTest.java | 42 -
 2 files changed, 72 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 39c3cbf13b2..6ddcfc9ef4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -218,13 +218,16 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener
 
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
 data.getBackLogQuotaMap() == null ? null : 
data.getBackLogQuotaMap().get(type.toString(;
-
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
-
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
 
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
 
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
 topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -263,15 +266,19 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
 topicPolicies.getMaxUnackedMessagesOnConsumer()
-
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
 topicPolicies.getMaxUnackedMessagesOnSubscription()
-
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
-
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
-
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
-
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
-
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+topicPolicies.getMessageTTLInSeconds()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+topicPolicies.getMaxSubscriptionsPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+topicPolicies.getMaxProducersPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+topicPolicies.getMaxConsumerPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
 topicPolicies.getMaxConsumersPerSubscription()
-
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
 
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
 
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
 
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -301,6 +308,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener producer = pulsarClient.newProducer()
+.topic(topic).create();
+PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
+conf.getMaxUnackedMessagesPerSubscription());
+
assertEquals(topicRef.topicPolicies.getMaxConsumersPerSubscription().get(),
+conf.getMaxConsumersPerSubscription());
+

(pulsar) 05/10: [improve] Make the config `metricsBufferResponse` description more effective (#22490)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f97a0bdb973a1ed4dd13986e84a1ba55459dd4fe
Author: 道君 
AuthorDate: Wed Apr 17 03:07:30 2024 +0800

[improve] Make the config `metricsBufferResponse` description more 
effective (#22490)

(cherry picked from commit 4ca4e2855267e3b36ee1a27f7144b89ba9194821)
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2e7d4d582d6..0fce252694f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2908,8 +2908,10 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 private boolean exposeTopicLevelMetricsInPrometheus = true;
 @FieldContext(
 category = CATEGORY_METRICS,
-doc = "If true, export buffered metrics"
-)
+doc = "Set to true to enable the broker to cache the metrics 
response; the default is false. "
++ "The caching period is defined by 
`managedLedgerStatsPeriodSeconds`. "
++ "The broker returns the same response for subsequent 
requests within the same period. "
++ "Ensure that the scrape interval of your monitoring 
system matches the caching period.")
 private boolean metricsBufferResponse = false;
 @FieldContext(
 category = CATEGORY_METRICS,



(pulsar) 03/10: [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d0a412eb93ed11dc3fd8cf12ac1d69d1ef25fc5
Author: Mukesh Kumar <65598381+mukesh...@users.noreply.github.com>
AuthorDate: Fri Apr 12 22:07:28 2024 +0530

[improve][broker] backlog quota exceed limit log replaced with `debug` 
(#22488)

(cherry picked from commit b85730069ee4c5f96406a075e354d0592fdab434)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 29df6e78cd1..a76ac548e0c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3150,14 +3150,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 if ((retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_request_hold
 || retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_exception)) {
 if (backlogQuotaType == BacklogQuotaType.destination_storage 
&& isSizeBacklogExceeded()) {
-log.info("[{}] Size backlog quota exceeded. Cannot create 
producer [{}]", this.getName(),
+log.debug("[{}] Size backlog quota exceeded. Cannot create 
producer [{}]", this.getName(),
 producerName);
 return FutureUtil.failedFuture(new 
TopicBacklogQuotaExceededException(retentionPolicy));
 }
 if (backlogQuotaType == BacklogQuotaType.message_age) {
 return checkTimeBacklogExceeded().thenCompose(isExceeded 
-> {
 if (isExceeded) {
-log.info("[{}] Time backlog quota exceeded. Cannot 
create producer [{}]", this.getName(),
+log.debug("[{}] Time backlog quota exceeded. 
Cannot create producer [{}]", this.getName(),
 producerName);
 return FutureUtil.failedFuture(new 
TopicBacklogQuotaExceededException(retentionPolicy));
 } else {



(pulsar) branch branch-3.2 updated (14841768466 -> e7ff0678e56)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 14841768466 [improve][offload] Apply autoSkipNonRecoverableData 
configuration to tiered storage (#22531)
 new 49bcbb6c3da [fix][broker] Update topic partition failed when config 
maxNumPartitionsPerPartitionedTopic<0 (#22397)
 new 72d290abd49 [fix][broker] Fix message drop record in producer stat 
(#22458)
 new 8d0a412eb93 [improve][broker] backlog quota exceed limit log replaced 
with `debug` (#22488)
 new bf89f080744 [fix][test] SchemaMap in AutoConsumeSchema has been reused 
(#22500)
 new f97a0bdb973 [improve] Make the config `metricsBufferResponse` 
description more effective (#22490)
 new 5d474657869 [improve][broker] Add topic name to emitted error 
messages. (#22506)
 new becbaf198c5 [improve][broker] Repeat the handleMetadataChanges 
callback when configurationMetadataStore equals localMetadataStore (#22519)
 new 50293b75654 [improve][test] Move ShadowManagedLedgerImplTest to flaky 
tests (#22526)
 new 91ce98d4eb6 [fix][broker] Fix typos in Consumer class (#22532)
 new e7ff0678e56 [fix][broker] Fix broken topic policy implementation 
compatibility with old pulsar version (#22535)

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/ShadowManagedLedgerImplTest.java  |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 ++-
 .../broker/admin/impl/PersistentTopicsBase.java|  2 +-
 .../pulsar/broker/service/AbstractTopic.java   | 69 +-
 .../pulsar/broker/service/BrokerService.java   |  4 +-
 .../org/apache/pulsar/broker/service/Consumer.java |  6 +-
 .../org/apache/pulsar/broker/service/Producer.java |  2 +-
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 ++
 ...kerInternalClientConfigurationOverrideTest.java | 42 -
 .../client/api/SimpleProducerConsumerTest.java | 66 -
 12 files changed, 185 insertions(+), 72 deletions(-)



(pulsar) 07/10: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit becbaf198c5d399375c6149d3493391c382232e0
Author: hanmz 
AuthorDate: Wed Apr 17 18:14:38 2024 +0800

[improve][broker] Repeat the handleMetadataChanges callback when 
configurationMetadataStore equals localMetadataStore (#22519)

(cherry picked from commit 1dd82a0affd6ec3686fa85d444c354e9ce12)
---
 .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f19b3436f7b..dc66750fd96 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -336,7 +336,9 @@ public class BrokerService implements Closeable {
 this.entryFilterProvider = new 
EntryFilterProvider(pulsar.getConfiguration());
 
 
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
-
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+if (pulsar.getConfigurationMetadataStore() != 
pulsar.getLocalMetadataStore()) {
+
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+}
 
 
 this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()



(pulsar) 08/10: [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 50293b7565483f5b074ca43d9b6df0836d10cb0e
Author: Lari Hotari 
AuthorDate: Wed Apr 17 12:46:43 2024 -0700

[improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)

(cherry picked from commit 56970b714f5adb606b02d12a99db1ceec3fa7832)
---
 .../org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
index cc4b3f24811..2aa04197ab9 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
@@ -51,7 +51,7 @@ public class ShadowManagedLedgerImplTest extends 
MockedBookKeeperTestCase {
 return (ShadowManagedLedgerImpl) shadowML;
 }
 
-@Test
+@Test(groups = "flaky")
 public void testShadowWrites() throws Exception {
 ManagedLedgerImpl sourceML = (ManagedLedgerImpl) 
factory.open("source_ML", new ManagedLedgerConfig()
 .setMaxEntriesPerLedger(2)



(pulsar) 09/10: [fix][broker] Fix typos in Consumer class (#22532)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91ce98d4eb63978dc7a4924fc92d327da9dffc67
Author: hanmz 
AuthorDate: Fri Apr 19 06:49:18 2024 +0800

[fix][broker] Fix typos in Consumer class (#22532)

(cherry picked from commit 7aedb6b20c120ec0a7cc096e33e6305caca26786)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java| 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 4cd54420200..6b2028095e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -147,7 +147,7 @@ public class Consumer {
 @Setter
 private volatile long consumerEpoch;
 
-private long negtiveUnackedMsgsTimestamp;
+private long negativeUnackedMsgsTimestamp;
 
 @Getter
 private final SchemaType schemaType;
@@ -1102,8 +1102,8 @@ public class Consumer {
 subscription.addUnAckedMessages(ackedMessages);
 unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, 
ackedMessages);
 }
-if (unackedMsgs < 0 && System.currentTimeMillis() - 
negtiveUnackedMsgsTimestamp >= 10_000) {
-negtiveUnackedMsgsTimestamp = System.currentTimeMillis();
+if (unackedMsgs < 0 && System.currentTimeMillis() - 
negativeUnackedMsgsTimestamp >= 10_000) {
+negativeUnackedMsgsTimestamp = System.currentTimeMillis();
 log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", 
unackedMsgs, ackedMessages, consumer);
 }
 return unackedMsgs;



(pulsar) 01/10: [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49bcbb6c3da2246bd887605fd68c10e1d3e9af28
Author: hanmz 
AuthorDate: Wed Apr 10 04:27:22 2024 +0800

[fix][broker] Update topic partition failed when config 
maxNumPartitionsPerPartitionedTopic<0 (#22397)

(cherry picked from commit fb5caeb2cd3353db0499e32e9ec79390741b809c)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  3 +-
 .../broker/admin/impl/PersistentTopicsBase.java|  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 ++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8b99501f296..2e7d4d582d6 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1351,7 +1351,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 category = CATEGORY_SERVER,
 dynamic = true,
 doc = "The number of partitions per partitioned topic.\n"
-+ "If try to create or update partitioned topics by exceeded 
number of partitions, then fail."
++ "If try to create or update partitioned topics by exceeded 
number of partitions, then fail.\n"
++ "Use 0 or negative number to disable the check."
 )
 private int maxNumPartitionsPerPartitionedTopic = 0;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 318e2bc2cde..b0968f494ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -348,7 +348,7 @@ public class PersistentTopicsBase extends AdminResource {
 }
 int brokerMaximumPartitionsPerTopic = 
pulsarService.getConfiguration()
 .getMaxNumPartitionsPerPartitionedTopic();
-if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > 
brokerMaximumPartitionsPerTopic) {
+if (brokerMaximumPartitionsPerTopic > 0 && expectPartitions > 
brokerMaximumPartitionsPerTopic) {
 throw new RestException(422 /* Unprocessable entity*/,
 String.format("Desired partitions %s can't be 
greater than the maximum partitions per"
 + " topic %s.",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 8e1375303ce..c588051a0fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1742,6 +1742,51 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
 partitionedTopicMetadata = metaCaptor.getValue();
 Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+// test for configuration maxNumPartitionsPerPartitionedTopic
+conf.setMaxNumPartitionsPerPartitionedTopic(4);
+response = mock(AsyncResponse.class);
+throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
+persistentTopics.updatePartitionedTopic(response, testTenant, 
testNamespaceLocal, topicName, false, true,
+true, 5);
+verify(response, 
timeout(5000).times(1)).resume(throwableCaptor.capture());
+Assert.assertEquals(throwableCaptor.getValue().getMessage(),
+"Desired partitions 5 can't be greater than the maximum 
partitions per topic 4.");
+
+response = mock(AsyncResponse.class);
+metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+persistentTopics.getPartitionedMetadata(response, testTenant, 
testNamespaceLocal, topicName, true, false);
+verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+partitionedTopicMetadata = metaCaptor.getValue();
+Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+conf.setMaxNumPartitionsPerPartitionedTopic(-1);
+response = mock(AsyncResponse.class);
+responseCaptor = ArgumentCaptor.forClass(Response.class);
+persistentTopics.updatePartitionedTopic(response, testTenant, 
testNamespaceLocal, topicName, false

(pulsar) 06/10: [improve][broker] Add topic name to emitted error messages. (#22506)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d4746578695f828e0901cb14fd1e83837fe4b1a
Author: 道君 
AuthorDate: Wed Apr 17 03:12:34 2024 +0800

[improve][broker] Add topic name to emitted error messages. (#22506)

(cherry picked from commit d5b36da9a2e0d4f17bea8e033180e494e93dc442)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java | 17 +
 .../org/apache/pulsar/broker/admin/AdminApi2Test.java   |  6 --
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 837f073b00d..39c3cbf13b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -855,7 +855,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener internalAddProducer(Producer producer) {
 if (isProducersExceeded(producer)) {
 log.warn("[{}] Attempting to add producer to topic which reached 
max producers limit", topic);
-return CompletableFuture.failedFuture(
-new BrokerServiceException.ProducerBusyException("Topic 
reached max producers limit"));
+return CompletableFuture.failedFuture(new 
BrokerServiceException.ProducerBusyException(
+"Topic '" + topic + "' reached max producers limit"));
 }
 
 if (isSameAddressProducersExceeded(producer)) {
 log.warn("[{}] Attempting to add producer to topic which reached 
max same address producers limit", topic);
-return CompletableFuture.failedFuture(
-new BrokerServiceException.ProducerBusyException("Topic 
reached max same address producers limit"));
+return CompletableFuture.failedFuture(new 
BrokerServiceException.ProducerBusyException(
+"Topic '" + topic + "' reached max same address producers 
limit"));
 }
 
 if (log.isDebugEnabled()) {
@@ -971,7 +971,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener producer = 
pulsarClient.newProducer().topic(topic).create();
 fail("should fail");
 } catch (PulsarClientException e) {
-assertTrue(e.getMessage().contains("Topic reached max producers 
limit"));
+String expectMsg = "Topic '" + topic + "' reached max producers 
limit";
+assertTrue(e.getMessage().contains(expectMsg));
 }
 //set the limit to 3
 admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
@@ -2901,7 +2902,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 Producer producer1 = 
pulsarClient.newProducer().topic(topic).create();
 fail("should fail");
 } catch (PulsarClientException e) {
-assertTrue(e.getMessage().contains("Topic reached max producers 
limit"));
+String expectMsg = "Topic '" + topic + "' reached max producers 
limit";
+assertTrue(e.getMessage().contains(expectMsg));
 }
 
 //clean up



(pulsar) 04/10: [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bf89f0807446f6e65d9db4cff24cce974e34eb01
Author: sinan liu 
AuthorDate: Tue Apr 16 21:19:44 2024 +0800

[fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)

(cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89)
---
 .../client/api/SimpleProducerConsumerTest.java | 66 +-
 1 file changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 7552b84a1c5..691f501777e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 public void testAccessAvroSchemaMetadata(Schema schema) throws 
Exception {
 log.info("-- Starting {} test --", methodName);
 
+if (pulsarClient == null) {
+pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+}
+
 final String topic = "persistent://my-property/my-ns/accessSchema";
 Consumer consumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
 .topic(topic)
@@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 producer.send(payload);
 producer.close();
 
-GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS).getValue();
-consumer.close();
-assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
-org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
-JsonNode nativeJsonRecord = null;
-if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
-nativeAvroRecord = (org.apache.avro.generic.GenericRecord) 
res.getNativeObject();
-assertNotNull(nativeAvroRecord);
-} else {
-nativeJsonRecord = (JsonNode) res.getNativeObject();
-assertNotNull(nativeJsonRecord);
-}
-for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
-log.info("field {} {}", f.getName(), res.getField(f));
-assertEquals("field", f.getName());
-assertEquals("a", res.getField(f));
-
-if (nativeAvroRecord != null) {
-// test that the native schema is accessible
-org.apache.avro.Schema.Field fieldDetails = 
nativeAvroRecord.getSchema().getField(f.getName());
-// a nullable string is an UNION
-assertEquals(org.apache.avro.Schema.Type.UNION, 
fieldDetails.schema().getType());
-
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.STRING));
-
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.NULL));
+try {
+GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS).getValue();
+consumer.close();
+assertEquals(schema.getSchemaInfo().getType(), 
res.getSchemaType());
+org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
+JsonNode nativeJsonRecord = null;
+if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+nativeAvroRecord = (org.apache.avro.generic.GenericRecord) 
res.getNativeObject();
+assertNotNull(nativeAvroRecord);
 } else {
-assertEquals(JsonNodeType.STRING, 
nativeJsonRecord.get("field").getNodeType());
+nativeJsonRecord = (JsonNode) res.getNativeObject();
+assertNotNull(nativeJsonRecord);
+}
+for (org.apache.pulsar.client.api.schema.Field f : 
res.getFields()) {
+log.info("field {} {}", f.getName(), res.getField(f));
+assertEquals("field", f.getName());
+assertEquals("a", res.getField(f));
+
+if (nativeAvroRecord != null) {
+// test that the native schema is accessible
+org.apache.avro.Schema.Field fieldDetails = 
nativeAvroRecord.getSchema().getField(f.getName());
+// a nullable string is an UNION
+assertEquals(org.apache.avro.Schema.Type.UNION, 
fieldDetails.schema().getType());
+
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.STRING));
+
assertTrue(fieldDetails.schema(

(pulsar) 02/10: [fix][broker] Fix message drop record in producer stat (#22458)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72d290abd49743bfb94173e831b907e5780b180f
Author: zhangqian <503837...@qq.com>
AuthorDate: Wed Apr 10 16:51:26 2024 +0800

[fix][broker] Fix message drop record in producer stat (#22458)

Co-authored-by: ceceezhang 
(cherry picked from commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36)
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 7e4459505a5..9cfde67802b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -749,7 +749,7 @@ public class Producer {
 }
 if (this.isNonPersistentTopic) {
 msgDrop.calculateRate();
-((NonPersistentPublisherStatsImpl) stats).msgDropRate = 
msgDrop.getRate();
+((NonPersistentPublisherStatsImpl) stats).msgDropRate = 
msgDrop.getValueRate();
 }
 }
 



(pulsar) branch branch-3.0 updated: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 8439082f79c [fix][broker] Fix broken topic policy implementation 
compatibility with old pulsar version (#22535)
8439082f79c is described below

commit 8439082f79c5480b58be93fb360ed07b68016631
Author: Rajan Dhabalia 
AuthorDate: Fri Apr 19 10:30:55 2024 -0700

[fix][broker] Fix broken topic policy implementation compatibility with old 
pulsar version (#22535)

(cherry picked from commit 59daac64c210f539e733f883edad09d08333aa62)
---
 .../pulsar/broker/service/AbstractTopic.java   | 52 +-
 ...kerInternalClientConfigurationOverrideTest.java | 42 -
 2 files changed, 72 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 5bbc30f7ed0..16aeabbcc75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -216,13 +216,16 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener
 
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
 data.getBackLogQuotaMap() == null ? null : 
data.getBackLogQuotaMap().get(type.toString(;
-
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
-
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
 
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
 
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
 topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -261,15 +264,19 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
 topicPolicies.getMaxUnackedMessagesOnConsumer()
-
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
 topicPolicies.getMaxUnackedMessagesOnSubscription()
-
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
-
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
-
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
-
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
-
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+topicPolicies.getMessageTTLInSeconds()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+topicPolicies.getMaxSubscriptionsPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+topicPolicies.getMaxProducersPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+topicPolicies.getMaxConsumerPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
 topicPolicies.getMaxConsumersPerSubscription()
-
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
 
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
 
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
 
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -299,6 +306,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener producer = pulsarClient.newProducer()
+.topic(topic).create();
+PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubs

(pulsar) 03/04: [improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 155f3cbf9f63993a4b6e84b107d17b556927b7c7
Author: Lari Hotari 
AuthorDate: Wed Apr 17 12:46:43 2024 -0700

[improve][test] Move ShadowManagedLedgerImplTest to flaky tests (#22526)

(cherry picked from commit 56970b714f5adb606b02d12a99db1ceec3fa7832)
---
 .../org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
index cc4b3f24811..2aa04197ab9 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java
@@ -51,7 +51,7 @@ public class ShadowManagedLedgerImplTest extends 
MockedBookKeeperTestCase {
 return (ShadowManagedLedgerImpl) shadowML;
 }
 
-@Test
+@Test(groups = "flaky")
 public void testShadowWrites() throws Exception {
 ManagedLedgerImpl sourceML = (ManagedLedgerImpl) 
factory.open("source_ML", new ManagedLedgerConfig()
 .setMaxEntriesPerLedger(2)



(pulsar) 04/04: [fix][broker] Fix typos in Consumer class (#22532)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit beb147c701728f9a2c0fcc9b56d4efb53f94a9bc
Author: hanmz 
AuthorDate: Fri Apr 19 06:49:18 2024 +0800

[fix][broker] Fix typos in Consumer class (#22532)

(cherry picked from commit 7aedb6b20c120ec0a7cc096e33e6305caca26786)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java| 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index d8ed99bb874..a98fc86c03e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -146,7 +146,7 @@ public class Consumer {
 @Setter
 private volatile long consumerEpoch;
 
-private long negtiveUnackedMsgsTimestamp;
+private long negativeUnackedMsgsTimestamp;
 
 @Getter
 private final SchemaType schemaType;
@@ -1086,8 +1086,8 @@ public class Consumer {
 subscription.addUnAckedMessages(ackedMessages);
 unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, 
ackedMessages);
 }
-if (unackedMsgs < 0 && System.currentTimeMillis() - 
negtiveUnackedMsgsTimestamp >= 10_000) {
-negtiveUnackedMsgsTimestamp = System.currentTimeMillis();
+if (unackedMsgs < 0 && System.currentTimeMillis() - 
negativeUnackedMsgsTimestamp >= 10_000) {
+negativeUnackedMsgsTimestamp = System.currentTimeMillis();
 log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", 
unackedMsgs, ackedMessages, consumer);
 }
 return unackedMsgs;



(pulsar) branch branch-3.0 updated (94f12543a9f -> beb147c7017)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 94f12543a9f [improve] Make the config `metricsBufferResponse` 
description more effective (#22490)
 new 1d202f93c81 [improve][broker] Add topic name to emitted error 
messages. (#22506)
 new 8f5b825b7a9 [improve][broker] Repeat the handleMetadataChanges 
callback when configurationMetadataStore equals localMetadataStore (#22519)
 new 155f3cbf9f6 [improve][test] Move ShadowManagedLedgerImplTest to flaky 
tests (#22526)
 new beb147c7017 [fix][broker] Fix typos in Consumer class (#22532)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mledger/impl/ShadowManagedLedgerImplTest.java |  2 +-
 .../org/apache/pulsar/broker/service/AbstractTopic.java   | 15 ---
 .../org/apache/pulsar/broker/service/BrokerService.java   |  4 +++-
 .../java/org/apache/pulsar/broker/service/Consumer.java   |  6 +++---
 .../org/apache/pulsar/broker/admin/AdminApi2Test.java |  6 --
 5 files changed, 19 insertions(+), 14 deletions(-)



(pulsar) 01/04: [improve][broker] Add topic name to emitted error messages. (#22506)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1d202f93c8108eb8e293c70df9a932292329e822
Author: 道君 
AuthorDate: Wed Apr 17 03:12:34 2024 +0800

[improve][broker] Add topic name to emitted error messages. (#22506)

(cherry picked from commit d5b36da9a2e0d4f17bea8e033180e494e93dc442)
---
 .../org/apache/pulsar/broker/service/AbstractTopic.java   | 15 ---
 .../org/apache/pulsar/broker/admin/AdminApi2Test.java |  6 --
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 507a5af5270..5bbc30f7ed0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -863,7 +863,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener internalAddProducer(Producer producer) {
 if (isProducersExceeded(producer)) {
 log.warn("[{}] Attempting to add producer to topic which reached 
max producers limit", topic);
-return CompletableFuture.failedFuture(
-new BrokerServiceException.ProducerBusyException("Topic 
reached max producers limit"));
+return CompletableFuture.failedFuture(new 
BrokerServiceException.ProducerBusyException(
+"Topic '" + topic + "' reached max producers limit"));
 }
 
 if (isSameAddressProducersExceeded(producer)) {
 log.warn("[{}] Attempting to add producer to topic which reached 
max same address producers limit", topic);
-return CompletableFuture.failedFuture(
-new BrokerServiceException.ProducerBusyException("Topic 
reached max same address producers limit"));
+return CompletableFuture.failedFuture(new 
BrokerServiceException.ProducerBusyException(
+"Topic '" + topic + "' reached max same address producers 
limit"));
 }
 
 if (log.isDebugEnabled()) {
@@ -1001,7 +1001,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener producer = 
pulsarClient.newProducer().topic(topic).create();
 fail("should fail");
 } catch (PulsarClientException e) {
-assertTrue(e.getMessage().contains("Topic reached max producers 
limit"));
+String expectMsg = "Topic '" + topic + "' reached max producers 
limit";
+assertTrue(e.getMessage().contains(expectMsg));
 }
 //set the limit to 3
 admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
@@ -2900,7 +2901,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
 Producer producer1 = 
pulsarClient.newProducer().topic(topic).create();
 fail("should fail");
 } catch (PulsarClientException e) {
-assertTrue(e.getMessage().contains("Topic reached max producers 
limit"));
+String expectMsg = "Topic '" + topic + "' reached max producers 
limit";
+assertTrue(e.getMessage().contains(expectMsg));
 }
 
 //clean up



(pulsar) 02/04: [improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore (#22519)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8f5b825b7a979acf80ad88ad2a939357d3f1a970
Author: hanmz 
AuthorDate: Wed Apr 17 18:14:38 2024 +0800

[improve][broker] Repeat the handleMetadataChanges callback when 
configurationMetadataStore equals localMetadataStore (#22519)

(cherry picked from commit 1dd82a0affd6ec3686fa85d444c354e9ce12)
---
 .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 20de00370e1..fdc9e5b33fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -336,7 +336,9 @@ public class BrokerService implements Closeable {
 this.entryFilterProvider = new 
EntryFilterProvider(pulsar.getConfiguration());
 
 
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
-
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+if (pulsar.getConfigurationMetadataStore() != 
pulsar.getLocalMetadataStore()) {
+
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
+}
 
 
 this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()



(pulsar) 06/06: [improve] Make the config `metricsBufferResponse` description more effective (#22490)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 94f12543a9ffe7e96f9af7ef9108d1b849ca3280
Author: 道君 
AuthorDate: Wed Apr 17 03:07:30 2024 +0800

[improve] Make the config `metricsBufferResponse` description more 
effective (#22490)

(cherry picked from commit 4ca4e2855267e3b36ee1a27f7144b89ba9194821)
---
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ae6928d2b32..0dc5d38c191 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2852,8 +2852,10 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 private boolean exposeTopicLevelMetricsInPrometheus = true;
 @FieldContext(
 category = CATEGORY_METRICS,
-doc = "If true, export buffered metrics"
-)
+doc = "Set to true to enable the broker to cache the metrics 
response; the default is false. "
++ "The caching period is defined by 
`managedLedgerStatsPeriodSeconds`. "
++ "The broker returns the same response for subsequent 
requests within the same period. "
++ "Ensure that the scrape interval of your monitoring 
system matches the caching period.")
 private boolean metricsBufferResponse = false;
 @FieldContext(
 category = CATEGORY_METRICS,



(pulsar) 01/06: [improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a9e815d464ca8d4620ba6d676d74831a730d533c
Author: Lari Hotari 
AuthorDate: Thu Apr 4 06:39:53 2024 -0700

[improve][build] Upgrade Lombok to 1.18.32 for Java 22 support (#22425)

(cherry picked from commit 5b6f91bc0f839c467bdc1af35c8eac7b14aa8822)
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 50e96b4d35f..0f561597f19 100644
--- a/pom.xml
+++ b/pom.xml
@@ -228,7 +228,7 @@ flexible messaging model and an intuitive client 
API.
 0.9.1
 2.1.0
 3.24.2
-1.18.30
+1.18.32
 1.3.2
 2.3.1
 1.2.0



(pulsar) 04/06: [improve][broker] backlog quota exceed limit log replaced with `debug` (#22488)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eb1b55ec6815375d751ff764ebe29455101a3b61
Author: Mukesh Kumar <65598381+mukesh...@users.noreply.github.com>
AuthorDate: Fri Apr 12 22:07:28 2024 +0530

[improve][broker] backlog quota exceed limit log replaced with `debug` 
(#22488)

(cherry picked from commit b85730069ee4c5f96406a075e354d0592fdab434)
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1363ca0945d..85fdc182ccc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2925,14 +2925,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 if ((retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_request_hold
 || retentionPolicy == 
BacklogQuota.RetentionPolicy.producer_exception)) {
 if (backlogQuotaType == BacklogQuotaType.destination_storage 
&& isSizeBacklogExceeded()) {
-log.info("[{}] Size backlog quota exceeded. Cannot create 
producer [{}]", this.getName(),
+log.debug("[{}] Size backlog quota exceeded. Cannot create 
producer [{}]", this.getName(),
 producerName);
 return FutureUtil.failedFuture(new 
TopicBacklogQuotaExceededException(retentionPolicy));
 }
 if (backlogQuotaType == BacklogQuotaType.message_age) {
 return checkTimeBacklogExceeded().thenCompose(isExceeded 
-> {
 if (isExceeded) {
-log.info("[{}] Time backlog quota exceeded. Cannot 
create producer [{}]", this.getName(),
+log.debug("[{}] Time backlog quota exceeded. 
Cannot create producer [{}]", this.getName(),
 producerName);
 return FutureUtil.failedFuture(new 
TopicBacklogQuotaExceededException(retentionPolicy));
 } else {



(pulsar) 05/06: [fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 42ae91ae31bc1ca27a13deb4b48a90ac06c89388
Author: sinan liu 
AuthorDate: Tue Apr 16 21:19:44 2024 +0800

[fix][test] SchemaMap in AutoConsumeSchema has been reused (#22500)

(cherry picked from commit ffdfc0c4e0881c682132e79c3cbf9768b1ab4f89)
---
 .../client/api/SimpleProducerConsumerTest.java | 66 +-
 1 file changed, 38 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index e76d6d6962c..68158dd69a4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4329,6 +4329,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 public void testAccessAvroSchemaMetadata(Schema schema) throws 
Exception {
 log.info("-- Starting {} test --", methodName);
 
+if (pulsarClient == null) {
+pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+}
+
 final String topic = "persistent://my-property/my-ns/accessSchema";
 Consumer consumer = 
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
 .topic(topic)
@@ -4344,37 +4348,43 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 producer.send(payload);
 producer.close();
 
-GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS).getValue();
-consumer.close();
-assertEquals(schema.getSchemaInfo().getType(), res.getSchemaType());
-org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
-JsonNode nativeJsonRecord = null;
-if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
-nativeAvroRecord = (org.apache.avro.generic.GenericRecord) 
res.getNativeObject();
-assertNotNull(nativeAvroRecord);
-} else {
-nativeJsonRecord = (JsonNode) res.getNativeObject();
-assertNotNull(nativeJsonRecord);
-}
-for (org.apache.pulsar.client.api.schema.Field f : res.getFields()) {
-log.info("field {} {}", f.getName(), res.getField(f));
-assertEquals("field", f.getName());
-assertEquals("a", res.getField(f));
-
-if (nativeAvroRecord != null) {
-// test that the native schema is accessible
-org.apache.avro.Schema.Field fieldDetails = 
nativeAvroRecord.getSchema().getField(f.getName());
-// a nullable string is an UNION
-assertEquals(org.apache.avro.Schema.Type.UNION, 
fieldDetails.schema().getType());
-
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.STRING));
-
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.NULL));
+try {
+GenericRecord res = consumer.receive(RECEIVE_TIMEOUT_SECONDS, 
TimeUnit.SECONDS).getValue();
+consumer.close();
+assertEquals(schema.getSchemaInfo().getType(), 
res.getSchemaType());
+org.apache.avro.generic.GenericRecord nativeAvroRecord = null;
+JsonNode nativeJsonRecord = null;
+if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
+nativeAvroRecord = (org.apache.avro.generic.GenericRecord) 
res.getNativeObject();
+assertNotNull(nativeAvroRecord);
 } else {
-assertEquals(JsonNodeType.STRING, 
nativeJsonRecord.get("field").getNodeType());
+nativeJsonRecord = (JsonNode) res.getNativeObject();
+assertNotNull(nativeJsonRecord);
+}
+for (org.apache.pulsar.client.api.schema.Field f : 
res.getFields()) {
+log.info("field {} {}", f.getName(), res.getField(f));
+assertEquals("field", f.getName());
+assertEquals("a", res.getField(f));
+
+if (nativeAvroRecord != null) {
+// test that the native schema is accessible
+org.apache.avro.Schema.Field fieldDetails = 
nativeAvroRecord.getSchema().getField(f.getName());
+// a nullable string is an UNION
+assertEquals(org.apache.avro.Schema.Type.UNION, 
fieldDetails.schema().getType());
+
assertTrue(fieldDetails.schema().getTypes().stream().anyMatch(s -> s.getType() 
== org.apache.avro.Schema.Type.STRING));
+
assertTrue(fieldDetails.schema(

(pulsar) 03/06: [fix][broker] Fix message drop record in producer stat (#22458)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b41e7527158da8e12dcd491dca4698c4a74d07ba
Author: zhangqian <503837...@qq.com>
AuthorDate: Wed Apr 10 16:51:26 2024 +0800

[fix][broker] Fix message drop record in producer stat (#22458)

Co-authored-by: ceceezhang 
(cherry picked from commit cea1a9ba9b576bf43f0a45ff8d65369b0f2bbb36)
---
 .../src/main/java/org/apache/pulsar/broker/service/Producer.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 53b79f06e8e..b077ae17a45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -741,7 +741,7 @@ public class Producer {
 }
 if (this.isNonPersistentTopic) {
 msgDrop.calculateRate();
-((NonPersistentPublisherStatsImpl) stats).msgDropRate = 
msgDrop.getRate();
+((NonPersistentPublisherStatsImpl) stats).msgDropRate = 
msgDrop.getValueRate();
 }
 }
 



(pulsar) 02/06: [fix][broker] Update topic partition failed when config maxNumPartitionsPerPartitionedTopic<0 (#22397)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 386f6f0bf73d01f7b23160e9e4642f4f7c01cac5
Author: hanmz 
AuthorDate: Wed Apr 10 04:27:22 2024 +0800

[fix][broker] Update topic partition failed when config 
maxNumPartitionsPerPartitionedTopic<0 (#22397)

(cherry picked from commit fb5caeb2cd3353db0499e32e9ec79390741b809c)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  3 +-
 .../broker/admin/impl/PersistentTopicsBase.java|  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 ++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c674a035653..ae6928d2b32 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1348,7 +1348,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 category = CATEGORY_SERVER,
 dynamic = true,
 doc = "The number of partitions per partitioned topic.\n"
-+ "If try to create or update partitioned topics by exceeded 
number of partitions, then fail."
++ "If try to create or update partitioned topics by exceeded 
number of partitions, then fail.\n"
++ "Use 0 or negative number to disable the check."
 )
 private int maxNumPartitionsPerPartitionedTopic = 0;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 602cd47e595..dac3e7cd55d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -433,7 +433,7 @@ public class PersistentTopicsBase extends AdminResource {
 }
 int brokerMaximumPartitionsPerTopic = 
pulsarService.getConfiguration()
 .getMaxNumPartitionsPerPartitionedTopic();
-if (brokerMaximumPartitionsPerTopic != 0 && expectPartitions > 
brokerMaximumPartitionsPerTopic) {
+if (brokerMaximumPartitionsPerTopic > 0 && expectPartitions > 
brokerMaximumPartitionsPerTopic) {
 throw new RestException(422 /* Unprocessable entity*/,
 String.format("Expect partitions %s grater than 
maximum partitions per topic %s",
 expectPartitions, 
brokerMaximumPartitionsPerTopic));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d7ffa656bdb..0eddbf1fea1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -1658,6 +1658,51 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
 partitionedTopicMetadata = metaCaptor.getValue();
 Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+// test for configuration maxNumPartitionsPerPartitionedTopic
+conf.setMaxNumPartitionsPerPartitionedTopic(4);
+response = mock(AsyncResponse.class);
+throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
+persistentTopics.updatePartitionedTopic(response, testTenant, 
testNamespaceLocal, topicName, false, true,
+true, 5);
+verify(response, 
timeout(5000).times(1)).resume(throwableCaptor.capture());
+Assert.assertEquals(throwableCaptor.getValue().getMessage(),
+"Desired partitions 5 can't be greater than the maximum 
partitions per topic 4.");
+
+response = mock(AsyncResponse.class);
+metaCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+persistentTopics.getPartitionedMetadata(response, testTenant, 
testNamespaceLocal, topicName, true, false);
+verify(response, timeout(5000).times(1)).resume(metaCaptor.capture());
+partitionedTopicMetadata = metaCaptor.getValue();
+Assert.assertEquals(partitionedTopicMetadata.partitions, 4);
+
+conf.setMaxNumPartitionsPerPartitionedTopic(-1);
+response = mock(AsyncResponse.class);
+responseCaptor = ArgumentCaptor.forClass(Response.class);
+persistentTopics.updatePartitionedTopic(response, testTenant, 
testNamespa

(pulsar) branch branch-3.0 updated (ff8d3b73437 -> 94f12543a9f)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from ff8d3b73437 [improve][offload] Apply autoSkipNonRecoverableData 
configuration to tiered storage (#22531)
 new a9e815d464c [improve][build] Upgrade Lombok to 1.18.32 for Java 22 
support (#22425)
 new 386f6f0bf73 [fix][broker] Update topic partition failed when config 
maxNumPartitionsPerPartitionedTopic<0 (#22397)
 new b41e7527158 [fix][broker] Fix message drop record in producer stat 
(#22458)
 new eb1b55ec681 [improve][broker] backlog quota exceed limit log replaced 
with `debug` (#22488)
 new 42ae91ae31b [fix][test] SchemaMap in AutoConsumeSchema has been reused 
(#22500)
 new 94f12543a9f [improve] Make the config `metricsBufferResponse` 
description more effective (#22490)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml|  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 ++-
 .../broker/admin/impl/PersistentTopicsBase.java|  2 +-
 .../org/apache/pulsar/broker/service/Producer.java |  2 +-
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 45 +++
 .../client/api/SimpleProducerConsumerTest.java | 66 +-
 7 files changed, 94 insertions(+), 36 deletions(-)



(pulsar) branch master updated: [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests (#22524)

2024-04-21 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4a887217d83 [improve][broker] Support X-Forwarded-For and HA Proxy 
Protocol for resolving original client IP of http/https requests (#22524)
4a887217d83 is described below

commit 4a887217d835629cafb393ddf331441b484d4e2c
Author: Lari Hotari 
AuthorDate: Mon Apr 22 07:49:34 2024 +0300

[improve][broker] Support X-Forwarded-For and HA Proxy Protocol for 
resolving original client IP of http/https requests (#22524)
---
 conf/broker.conf   |  10 ++
 conf/functions_worker.yml  |  10 ++
 conf/proxy.conf|  10 ++
 conf/standalone.conf   |  10 ++
 conf/websocket.conf|  10 ++
 pom.xml|   1 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../pulsar/broker/web/JettyRequestLogFactory.java  | 195 -
 pulsar-broker/pom.xml  |   7 +
 .../org/apache/pulsar/broker/web/WebService.java   |  37 +++-
 .../broker/web/WebServiceOriginalClientIPTest.java | 155 
 pulsar-broker/src/test/resources/log4j2.xml|   3 +-
 .../pulsar/functions/worker/WorkerConfig.java  |  16 ++
 .../pulsar/functions/worker/rest/WorkerServer.java |  38 +++-
 pulsar-proxy/pom.xml   |   6 +
 .../pulsar/proxy/server/ProxyConfiguration.java|  16 ++
 .../pulsar/proxy/server/ProxyServiceStarter.java   |  31 +++-
 .../org/apache/pulsar/proxy/server/WebServer.java  |  34 +++-
 .../proxy/server/ProxyOriginalClientIPTest.java| 157 +
 .../ProxyServiceStarterDisableZeroCopyTest.java|   2 +-
 .../proxy/server/ProxyServiceStarterTest.java  |   2 +-
 .../proxy/server/ProxyServiceTlsStarterTest.java   |   2 +-
 .../src/test/resources/log4j2.xml  |   7 +-
 .../pulsar/websocket/service/ProxyServer.java  |  39 -
 .../service/WebSocketProxyConfiguration.java   |  14 ++
 25 files changed, 801 insertions(+), 27 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index fd6bba0f45d..d482f77da7c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Number of threads to config Netty Acceptor. Default is 1
 numAcceptorThreads=
 
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 3871c74a887..6f995576ebd 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -27,6 +27,16 @@ workerHostname: localhost
 workerPort: 6750
 workerPortTls: 6751
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled: false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor: false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses: null
+
 # The Configuration metadata store url
 # Examples:
 # * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 5a9d433f39c..6e6c960e800 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -63,6 +63,16 @@ advertisedAddress=
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
 haProxyProtocolEnabled=false
 
+# Enable or disable the use of HA proxy protocol for resolving the client IP 
for http/https requests.
+webServiceHaProxyProtocolEnabled=false
+
+# Trust X-Forwarded-For header for resolving the client IP for http/https 
requests. Default is false.
+webServiceTrustXForwardedFor=false
+
+# Add detailed client/remote and server/local addresses and ports to 
http/https request logging.
+# Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+webServiceLogDetailedAddresses=
+
 # Enables zero

Re: [PR] [improve][broker] Support X-Forwarded-For and HA Proxy Protocol for resolving original client IP of http/https requests [pulsar]

2024-04-21 Thread via GitHub


lhotari merged PR #22524:
URL: https://github.com/apache/pulsar/pull/22524


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] Update Oxia client to 0.1.6 [pulsar]

2024-04-21 Thread via GitHub


nodece commented on PR #22525:
URL: https://github.com/apache/pulsar/pull/22525#issuecomment-2068395912

   ```
   LockManagerTest.acquireLocks:66 » NoClassDefFound Could not initialize class 
io.streamnative.oxia.proto.ListResponse
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


gaoran10 commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574055874


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, check 

Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]

2024-04-21 Thread via GitHub


GitHub user wallacepeng edited a discussion: broker healthcheck endpoint 
reports No such ledger exists on Metadata Server -  ledger endlessly


2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR 
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck]
 [null] Failed to create producer: 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException:
 No such ledger exists on Metadata Server -  ledger=3596046 - operation=Failed 
to open ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger","reqId":3493733038286036253, 
"remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650",
 "local":"/172.20.203.179:58124"}
2024-04-21 03:14:20.474 
2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, 
L:/172.20.203.179:58124 - 
R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] 
Received error from server: 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger


this is caused by that internal healthcheck cannot create producer for topic 
(it seems have null producerName)
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
 . 
it reports No such ledger exists on Metadata Server  .  All the others work 
except the healthcheck .

i have bookkeeper clusters, i decommission the default one then it got this 
error . does it still try to create ledger on decommisisoned bookie ?

```

bin/pulsar-admin topics stats-internal 
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
No such ledger exists on Metadata Server

Reason: No such ledger exists on Metadata Server



https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";>


GitHub link: https://github.com/apache/pulsar/discussions/22545


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]

2024-04-21 Thread via GitHub


GitHub user wallacepeng edited a discussion: broker healthcheck endpoint 
reports No such ledger exists on Metadata Server -  ledger endlessly


2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR 
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck]
 [null] Failed to create producer: 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException:
 No such ledger exists on Metadata Server -  ledger=3596046 - operation=Failed 
to open ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger","reqId":3493733038286036253, 
"remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650",
 "local":"/172.20.203.179:58124"}
2024-04-21 03:14:20.474 
2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, 
L:/172.20.203.179:58124 - 
R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] 
Received error from server: 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger


this is caused by that internal healthcheck cannot create producer for topic 
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
 . 
it reports No such ledger exists on Metadata Server  .  All the others work 
except the healthcheck .

i have bookkeeper clusters, i decommission the default one then it got this 
error . does it still try to create ledger on decommisisoned bookie ?

```

bin/pulsar-admin topics stats-internal 
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
No such ledger exists on Metadata Server

Reason: No such ledger exists on Metadata Server



https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";>


GitHub link: https://github.com/apache/pulsar/discussions/22545


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]

2024-04-21 Thread via GitHub


GitHub user wallacepeng edited a discussion: broker healthcheck endpoint 
reports No such ledger exists on Metadata Server -  ledger endlessly


2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR 
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck]
 [null] Failed to create producer: 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException:
 No such ledger exists on Metadata Server -  ledger=3596046 - operation=Failed 
to open ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger","reqId":3493733038286036253, 
"remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650",
 "local":"/172.20.203.179:58124"}
2024-04-21 03:14:20.474 
2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, 
L:/172.20.203.179:58124 - 
R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] 
Received error from server: 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger


this is caused by that internal healthcheck cannot create producer for topic 
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
 . 
it reports No such ledger exists on Metadata Server  .

i have bookkeeper clusters, i decommission the default one then it got this 
error . does it still try to create ledger on decommisisoned bookie ?

```

bin/pulsar-admin topics stats-internal 
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
No such ledger exists on Metadata Server

Reason: No such ledger exists on Metadata Server



https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7";>


GitHub link: https://github.com/apache/pulsar/discussions/22545


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [improve]Improve batching message doc [pulsar-site]

2024-04-21 Thread via GitHub


dao-jun commented on code in PR #878:
URL: https://github.com/apache/pulsar-site/pull/878#discussion_r1573879880


##
docs/concepts-messaging.md:
##
@@ -427,6 +427,13 @@ Consumer consumer = pulsarClient.newConsumer()
 .subscribe();
 ```
 
+:::note
+
+Send messages by synchronous API `send` will disable batching, and the message 
will be sent individually.

Review Comment:
   Yes I know it, it's just that my statement is not accurate.
   
   How about change it to: 
   ```txt
   Send messages by synchronous API `send` will trigger the batch to be sent 
immediately, even the batch is not full.
   It is for the purpose of reducing the latency of sending messages and 
preventing blocking of the caller's thread.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   No, it breaks the order of consumption on the remote cluster if they are 
using Key_Shared mode, letting users determine how to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   No, it breaks the order of consumption on the remote cluster if they are 
using Key_Shared mode, just ask users to determine how to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573842742


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+admin2.topics().createPartitionedTopic(topicName, 3);
+admin1.topics().createPartitionedTopic(topicName, 2);
+try {
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+fail("Expected error due to a conflict partitioned topic already 
exists.");
+} catch (Exception ex) {
+Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
+assertTrue(unWrapEx.getMessage().contains("with different 
partitions"));
+}
+// Check nothing changed.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 3);
+assertEquals(admin1.topics().getReplicationClusters(topicName, 
true).size(), 1);
+// cleanup.
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+/**
+ * TODO next PR will correct the behavior below, just left this test here.
+ */
+// @Test
+private void testNamespaceLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][test] Add topic policy test for topic API [pulsar]

2024-04-21 Thread via GitHub


Technoboy- opened a new pull request, #22546:
URL: https://github.com/apache/pulsar/pull/22546

   
   ### Motivation
   
   Add test for topic policies.
   
   ### Modification
   - Add test for topic policy.
   - Separate transaction and schema test to a new class 
   
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Consumer able to receive message which is not matching the regex pattern [pulsar]

2024-04-21 Thread via GitHub


visortelle commented on issue #22529:
URL: https://github.com/apache/pulsar/issues/22529#issuecomment-2067943011

   Interesting.
   
   My observation is that after you create a pattern consumer, for 
non-persistent topics it doesn't "immediately" create the underlying 
subscription and consumers if there are no connected producers at this moment. 
But it will eventually be created after a short time.
   
   TIP: you can display the list of the underlying consumers by casting your 
consumer to `PatternMultiTopicsConsumerImpl` and calling the `.getConsumers()` 
method.
   
   ```java
   List> consumers = 
   ((PatternMultiTopicsConsumerImpl) 
allTopicsConsumer).getConsumers();
   for (ConsumerImpl consumer : consumers) {
   System.out.println("consumer: " + consumer.getTopic());
   }
   ```
   
   If you modify your code to send a lot of messages asynchronously, you'll 
start to receive them after a short time. 
   
   ```java
   for (int i = 0; i < 10; i++) {
   producer.sendAsync("=from topic 
non-persistent://tenant-1/name/topic-1 " + i);
   }
   ```
   
   ```
   ...
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10732
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10733
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10734
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10735
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10736
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10737
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10738
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10739
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10740
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10741
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10742
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10743
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10744
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10745
   Received message from topic non-persistent://tenant-1/name/topic-1: 
=from topic non-persistent://tenant-1/name/topic-1 10746
   ```
   
   I don't know if it can qualify as a bug. cc @lhotari
   
   I wouldn't rely on non-persistent topics if losing a non-significant amount 
of messages could affect my application.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org