svn commit: r67772 [1/2] - in /dev/pulsar/pulsar-3.1.3-candidate-2: ./ connectors/

2024-03-06 Thread rgao
Author: rgao
Date: Thu Mar  7 07:53:59 2024
New Revision: 67772

Log:
Staging artifacts and signature for Pulsar release 3.1.3

Added:
dev/pulsar/pulsar-3.1.3-candidate-2/
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-3.1.3-bin.tar.gz   (with 
props)
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-3.1.3-bin.tar.gz.asc
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-3.1.3-bin.tar.gz.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-3.1.3-src.tar.gz   (with 
props)
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-3.1.3-src.tar.gz.asc
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-3.1.3-src.tar.gz.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-offloaders-3.1.3-bin.tar.gz   
(with props)

dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-offloaders-3.1.3-bin.tar.gz.asc

dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-offloaders-3.1.3-bin.tar.gz.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-shell-3.1.3-bin.tar.gz   
(with props)
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-shell-3.1.3-bin.tar.gz.asc

dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-shell-3.1.3-bin.tar.gz.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-shell-3.1.3-bin.zip   
(with props)
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-shell-3.1.3-bin.zip.asc
dev/pulsar/pulsar-3.1.3-candidate-2/apache-pulsar-shell-3.1.3-bin.zip.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/LICENSE
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/README

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-aerospike-3.1.3.nar   
(with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-aerospike-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-aerospike-3.1.3.nar.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-alluxio-3.1.3.nar  
 (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-alluxio-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-alluxio-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-batch-data-generator-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-batch-data-generator-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-batch-data-generator-3.1.3.nar.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-canal-3.1.3.nar   
(with props)
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-canal-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-canal-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-cassandra-3.1.3.nar   
(with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-cassandra-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-cassandra-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-data-generator-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-data-generator-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-data-generator-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mongodb-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mongodb-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mongodb-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mssql-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mssql-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mssql-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mysql-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mysql-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-mysql-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-oracle-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-oracle-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-oracle-3.1.3.nar.sha512

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-postgres-3.1.3.nar
   (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-postgres-3.1.3.nar.asc

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-debezium-postgres-3.1.3.nar.sha512
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-dynamodb-3.1.3.nar 
  (with props)

dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-dynamodb-3.1.3.nar.asc


svn commit: r67772 [2/2] - in /dev/pulsar/pulsar-3.1.3-candidate-2: ./ connectors/

2024-03-06 Thread rgao
Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar.asc
==
--- 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar.asc
 (added)
+++ 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar.asc
 Thu Mar  7 07:53:59 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEEtWezB1n0EhojFQBl2aZsamxtvOoFAmXpbtoACgkQ2aZsamxt
+vOqYQxAAnsLGE5ZFIx64YcBZB7H3dKPQ9Yg8gH1K8mYykONXCcIzZB85IN4QNFgr
+f1vaoQn/BmO+JgPRaQ6sfUUnuS4vT7IWjgGlkF/HOREaKl8uTN/FwXC9Nu79UB5c
+VykDeIFL3fJ8s79sa/QNACIz+lLg5KhpWs5CQweGWM5HeLtmYZnqcrXtxe05JRQ5
+deEmtHo/hDhlhDf4/ymRdiHAoOby1M9UxeemN1q91ZICcqU9+UbsCXmBkWxnqQLT
+bJhfw5DKszg3KDpawspYLIDkdyMTqn2e2qJoKekbMOb0lMcMSs6K1d0txHH85wFh
+lu4VsGy7+XCNyA0ADyzhnuVjfKAelGOARE3FQzddRwnCVMrT+DD/6b30HQQDW6tc
+Ku6yDFQI62iFVcUYS25No7dF0FjMKnSSEgYWbxKQQPBF3WwMxslCHPkdcJYUN2Kn
+hceIpejbKI03tIRuC4pqohcXQANhv+To/934/o9t0Lwuwsxovab98dDfHv4aP+S7
+5GrnxxBLHcivG9cw9VVj0h8IrhU2U78Yy5tQ0Q5O48f95CeNSIeLC/LBgB/feYA9
+BSuO7MLV3+DQCJ1fCV3ndSeQHrC1mg9preQw9+ex5IpqXD0t9QuAiOynUVNUthY5
+j0GHIhFRy0XCCL3YCQWNuaumRXQjBezlwfG3ZYdhW+aIM0rayKE=
+=Wjck
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar.sha512
==
--- 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.1.3.nar.sha512
 Thu Mar  7 07:53:59 2024
@@ -0,0 +1 @@
+ac9c5a4bee5afb48897e793d3aa01b3a687f0a380da4002b8b5ec097bb1dfa47b85bf1f6e19f22afc9515eb82c8831eedd7f615376a3291d355ee71c078c9b14
  ./connectors/pulsar-io-jdbc-sqlite-3.1.3.nar

Added: dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar.asc
==
--- 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar.asc 
Thu Mar  7 07:53:59 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEEtWezB1n0EhojFQBl2aZsamxtvOoFAmXpbs8ACgkQ2aZsamxt
+vOqCthAAkWTn3gV2GdG+/VFHI32UFskdJ5IffpoF3pNthx3JCPTmQ8hB1GEsQdNT
+j/yaH4WXZ3z+VrlTHWo2kIUGVoNfOzkBQG55gw+aMtD0gjWw5b6D9dJ9/RTPISeo
+7DKL9KEhONCgUaKQtO43205TfGT3g5akN7Yb8gpdaXy7ZKqdOPNvM+zd2gX8xONz
+ATAKSUumBXsvX9B7OSsNhZfr7sycTJcNrzw21bPhCtt53glauqsdbW3V5qVV072J
+u6mDYXa46kUqLRVSq/N15YQby/YOkAM1E509o3KHfzhKzmGa3WbdVp8CLhb/ozHM
+2QzXlgSis5u/MDGA1NdMQIVr2rooUWwZ4p/FULnYy/cmhQWh47QVGRO2a/p3UoD0
+bgpJNike9PA/m5g7ijZVtN+xQ+lmqvmTQbKOgTE7V07pKeqDLykRJHfM0Ua5D8lc
+/d08afL7jpWtnliTRRN6VyJwWmHiqH1jOK0es/mdu6Y5U0JIPDdRGS8RSLnThAYl
+n5scHjzyMf9Nf/WW9YbQeCG3BfDxPkRhjqLQFrPFO6urOa93mNsUywmNII1489s8
+Q1moCnx5VfxigDZS3CXfPgFxjJSqx1LpcCae+YL6+kwHXjHh8GRbOP2jeq/ghUW/
+Lhuo+V5YnYNbYl4rhnJMsWRPf6dX2npU0qnWLD5AuFaZYl1NWkQ=
+=SpES
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar.sha512
==
--- 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar.sha512 
(added)
+++ 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-3.1.3.nar.sha512 
Thu Mar  7 07:53:59 2024
@@ -0,0 +1 @@
+cb468d737c64f07e5b54c200a662050a38e7d9f7569ffc2f8be6fa71bc00325c805b7e1dd81b7ffd2d44cf152455f1762b1c3af578f0bcda41266ce8308fb2b5
  ./connectors/pulsar-io-kafka-3.1.3.nar

Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-connect-adaptor-3.1.3.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-connect-adaptor-3.1.3.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.1.3-candidate-2/connectors/pulsar-io-kafka-connect-adaptor-3.1.3.nar.asc

Re: [PR] [fix][broker] Introduce the last sent position to fix message ordering issues in Key_Shared (PIP-282) [pulsar]

2024-03-06 Thread via GitHub


poorbarcode commented on code in PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#discussion_r1515671343


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -219,6 +242,24 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
 }
 }
 }
+
+// Update if the markDeletePosition move forward
+updateIfNeededAndGetLastSentPosition();
+
+// Should not access to individualDeletedMessages from outside 
managed cursor
+// because it doesn't guarantee thread safety.
+if (lastSentPosition == null) {

Review Comment:
   Just mark this for easier reading: initialize `lastSentPosition & 
individuallySentPositions ` after the dispatcher created or the first consumer 
joined.
   



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -308,6 +361,62 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
 }
 }
 
+// Update the last sent position and remove ranges from 
individuallySentPositions if necessary
+if (!allowOutOfOrderDelivery && lastSentPosition != null) {

Review Comment:
   Just mark this for easier reading: update `lastSentPosition` to the position 
of the first consecutive sent message



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -122,15 +136,18 @@ public synchronized CompletableFuture 
addConsumer(Consumer consumer) {
 })
 ).thenRun(() -> {
 synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) 
{
-PositionImpl readPositionWhenJoining = (PositionImpl) 
cursor.getReadPosition();
-consumer.setReadPositionWhenJoining(readPositionWhenJoining);
-// If this was the 1st consumer, or if all the messages are 
already acked, then we
-// don't need to do anything special
-if (!allowOutOfOrderDelivery
-&& recentlyJoinedConsumers != null
-&& consumerList.size() > 1
-&& 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
-recentlyJoinedConsumers.put(consumer, 
readPositionWhenJoining);
+if (!allowOutOfOrderDelivery) {
+final PositionImpl lastSentPositionWhenJoining = 
updateIfNeededAndGetLastSentPosition();
+if (lastSentPositionWhenJoining != null) {
+
consumer.setLastSentPositionWhenJoining(lastSentPositionWhenJoining);
+// If this was the 1st consumer, or if all the 
messages are already acked, then we
+// don't need to do anything special
+if (recentlyJoinedConsumers != null
+&& consumerList.size() > 1
+&& 
cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+recentlyJoinedConsumers.put(consumer, 
lastSentPositionWhenJoining);

Review Comment:
   After consumers redelivery messages, you did not remove them out of 
`individuallySentPositions`, right? So the mechanism of "Calculate the lastest 
position sent" could not work as expected, right?
   
   Suggestion: when a new consumer joins, we can calculate "the lastest 
position sent" in real-time relying on these variables:
   - `cursor.individualDeletedMessages`
   - `dispatcher.redeliveryMessages`
   - `consumer.pendingAcks`
   
   Then the new variable `individuallySentPositions ` can be removed, and we do 
not need to update it in-time, which makes the logic simpler



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -276,12 +317,24 @@ protected synchronized boolean 
trySendMessagesToConsumers(ReadType readType, Lis
 }
 
 if (messagesForC > 0) {
-// remove positions first from replay list first : 
sendMessages recycles entries
-if (readType == ReadType.Replay) {
-for (int i = 0; i < messagesForC; i++) {
-Entry entry = entriesWithSameKey.get(i);
+final ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) 
cursor.getManagedLedger());
+for (int i = 0; i < messagesForC; i++) {
+final Entry entry = entriesWithSameKey.get(i);
+// remove positions first from replay list first : 
sendMessages recycles entries
+if (readType == ReadType.Replay) {
 

[I] [Bug] Producer creation gets blocked “Closed producer before its creation was completed” [pulsar]

2024-03-06 Thread via GitHub


Xxf9126 opened a new issue, #22217:
URL: https://github.com/apache/pulsar/issues/22217

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   Pulsar 2.10.2
   
   
   org.apache.pulsar
   pulsar-client
   2.10.2

   
   ### Minimal reproduce step
   
   Call the Pulsar Client API to create a topic producer.
   
   ### What did you expect to see?
   
   The client returns normally and the producer is created successfully.
   
   ### What did you see instead?
   
   broker log:
   1.Topic producer creating
   `2024-03-05T02:35:13,559+ [pulsar-io-4-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - 
[/42.193.247.132:55368][persistent://rinoiot_1656861934759301120/SpwnFnVeq7QPbU2CNcbS/device_dp_report]
 Creating producer. producerId=28`
   
   2.It prompts that the creation is successful and then starts blocking. At 
this time, the client thread is also the same. The blocking process lasts 23 
seconds.
   `2024-03-05T02:35:13,591+ [bookkeeper-ml-scheduler-OrderedScheduler-0-0] 
INFO  org.apache.pulsar.broker.service.BrokerService - Created topic 
persistent://rinoiot_1656861934759301120/SpwnFnVeq7QPbU2CNcbS/device_dp_report 
- dedup is disabled`
   `2024-03-05T02:35:36,644+ [pulsar-web-36-8] INFO  
org.eclipse.jetty.server.RequestLog - 192.168.64.2 - - [05/Mar/2024:02:35:36 
+] "GET /admin/v2/clusters HTTP/1.1" 200 13 "-" "Pulsar-Java-v2.5.2" 1`
   
   3.Client reports timeout error broker log printing
   `2024-03-05T02:35:37,932+ [pulsar-io-4-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/42.193.247.132:55368] Closed 
producer before its creation was completed. producerId=28`
   
   4.The client initiates a request again to create the topic producer
   `2024-03-05T02:35:37,953+ [pulsar-io-4-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - 
[/42.193.247.132:55368][persistent://rinoiot_1656861934759301120/SpwnFnVeq7QPbU2CNcbS/device_dp_report]
 Creating producer. producerId=29`
   
   5.The request creation is not blocked but returns an error
   `2024-03-05T02:36:07,814+ [pulsar-io-4-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/42.193.247.132:55368] Closed 
producer before its creation was completed. producerId=29`
   
   6.After continuing to request several times to create a topic, the broker 
prints an exception log:
   
`[persistent://rinoiot_1656861934759301120/SpwnFnVeq7QPbU2CNcbS/device_dp_report]
 Global topic inactive for 60 seconds, closed repl producers`
   `2024-03-05T02:37:44,217+ [pulsar-inactivity-monitor-24-1] WARN  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://rinoiot_1656861934759301120/SpwnFnVeq7QPbU2CNcbS/device_dp_report]
 Topic is already being closed or deleted`
   `2024-03-05T02:37:44,217+ [pulsar-inactivity-monitor-24-1] WARN  
org.apache.pulsar.broker.service.persistent.PersistentTopic - 
[persistent://rinoiot_1656861934759301120/SpwnFnVeq7QPbU2CNcbS/device_dp_report]
 Inactive topic deletion failed
   java.util.concurrent.CompletionException: 
org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: 
Topic is already fenced`
   
   7.Subsequent requests to the topic kept reporting the error "Topic is 
already fenced", and finally restarted Pulsar to recover.
   
   The following is the complete log of the broker:
   
![20240307151942](https://github.com/apache/pulsar/assets/47099845/bd90e4a3-720e-466e-9dec-f94ede54d639)
   
![20240307152153](https://github.com/apache/pulsar/assets/47099845/90b6d52f-206d-430b-854c-0b78d80aeec0)
   
![20240307152337](https://github.com/apache/pulsar/assets/47099845/25165a46-76ec-4c61-a8c0-1b6a2497ddf9)
   
![20240307152419](https://github.com/apache/pulsar/assets/47099845/18c69f52-8505-43e7-9f25-c86e86c4d3a2)
   
   The following is my client-side producer code
   
![20240307152713](https://github.com/apache/pulsar/assets/47099845/77549ee8-8a51-4d20-91c1-24d0b6c95970)
   
   
   
   ### Anything else?
   
   How can I improve my code or Pulsar configuration to avoid the blocking 
scenario, or allow operations to continue on a topic after it gets blocked? 
Once a topic enters the "fenced" state, the client is unable to perform any 
further operations on that topic besides restarting the broker.
   
   Would the following information be helpful for my situation?
   [https://github.com/apache/pulsar/issues/20526#issuecomment-1581958101](url)
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) annotated tag v3.1.3-candidate-2 updated (dcaf508f8e3 -> f194dfde718)

2024-03-06 Thread rgao
This is an automated email from the ASF dual-hosted git repository.

rgao pushed a change to annotated tag v3.1.3-candidate-2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v3.1.3-candidate-2 was modified! ***

from dcaf508f8e3 (commit)
  to f194dfde718 (tag)
 tagging dcaf508f8e381107125e98722f4ddab76f9303ad (commit)
 replaces v3.1.3-candidate-1
  by rangao
  on Thu Mar 7 15:26:11 2024 +0800

- Log -
Release v3.1.3-candidate-2
-BEGIN PGP SIGNATURE-

iQJEBAABCgAuFiEEtWezB1n0EhojFQBl2aZsamxtvOoFAmXpbBMQHHJnYW9AYXBh
Y2hlLm9yZwAKCRDZpmxqbG286t01D/sE5FKw8Zew1sBUVh9TzlvDM3Iq0O7dEY4W
djQCnxkwNRKdbSiq3pLbFsmje46Wfwi04MSODao1hEL/13FIAWRv+/NwG0QoWui7
OcBoJl5+ShL7tYML3w8NywixuW4G+j9brJ/+tIQppY9y65Dzqq+regQFNa59o9bk
gCaHHIG1vlKEUwveSeSAITG7xGN4oX6QWIbAcocHUqtF2iHzTr5zvKV+Y6z8qpQK
hV5JHMJKFlypl7/uSDAcdunHIrJeOL6+unwcU/InEYORmmBlvl7/TvxjvC+vU+FQ
viHVxRVn1sKXpXTLJF4tCgApA83rZk9aMcgAqHlWaYFgUQo0ntWl7JUX52fRbx3o
gPp6A1xf6YrJHuvNOkRuzBDp1lQ6FOloSrgOOHVw0aUq/34yKDUYvnsBrUIfdqDv
TRJv0CVn1AHdGCJDH/npPX9HZ4A1NuLc4m/HHVV3LQw7V4MFniee3st9jvvWG4Iw
NtBSETkzS4JzxIzz/54ifF4MeE/roincX4lFIzPw6MtjBrJ/kPl4A2i10qD7B3Af
QwczNgR92Jcgykf2HjvqKFzkdLJpncCeqPo+3L0LqjMWtgI930OEyhQUh3MRVf/M
4Q0Kcfjc6T2GKns0WqOGrfgHt9O+dK0qznGXAHuh0zhIRoBjafHtUCIrqVIkXSHa
Rh5TeuKEHA==
=/t8G
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



Re: [D] When messages are consumed normally, backlogsize does not decrease, but increases. The backlog quota exceeded [pulsar]

2024-03-06 Thread via GitHub


GitHub user imitateinfant added a comment to the discussion: When messages are 
consumed normally, backlogsize does not decrease, but increases. The backlog 
quota exceeded

Yes, both my client and broker are using version 2.5.0, I will try to upgrade 
to the recommended version and see if that works.thank you

GitHub link: 
https://github.com/apache/pulsar/discussions/22214#discussioncomment-8703093


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [I] Pulsar Authentication should support rotation of Validation keys e.g. Public keys used in JWT validation [pulsar]

2024-03-06 Thread via GitHub


nodece commented on issue #8152:
URL: https://github.com/apache/pulsar/issues/8152#issuecomment-1982591291

   @damienburke Please see #22215,  after is merged, you can continue to 
improve this feature that rotation keys.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][authentication] Add JWKS support for AuthenticationProviderToken [pulsar]

2024-03-06 Thread via GitHub


nodece commented on PR #18336:
URL: https://github.com/apache/pulsar/pull/18336#issuecomment-1982584410

   Use https://github.com/apache/pulsar/pull/22215 instead of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] PIP-226: Add JWKS support for AuthenticationProviderToken [pulsar]

2024-03-06 Thread via GitHub


nodece commented on PR #22215:
URL: https://github.com/apache/pulsar/pull/22215#issuecomment-1982581824

   Old PR: https://github.com/apache/pulsar/pull/18336


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][broker] PIP-226: Add JWKS support for AuthenticationProviderToken [pulsar]

2024-03-06 Thread via GitHub


nodece opened a new pull request, #22215:
URL: https://github.com/apache/pulsar/pull/22215

   PIP: #18798 
   
   ### Motivation
   
   This PIP was agreed upon a long time ago, but because the OIDC provider 
supports this feature, it was turned off.
   
   Community users need this feature, but do not want to use the OIDC provider, 
please see #8152
   
   This PIP doesn't support rotating public keys, we only load the public key 
once at startup, we can improve here by cache.
   
   ### Modifications
   
   - Add tokenKeySet config to provide the JWKS support
   
   ### Verifying this change
   
   - Added RSA test
   - Added EC test
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [x] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client] add externalListenerThreadName parameter [pulsar]

2024-03-06 Thread via GitHub


lhotari commented on PR #22174:
URL: https://github.com/apache/pulsar/pull/22174#issuecomment-1982554307

   Since the problem for you was logging configuration, isn't there a way to 
limit the length of the thread name in the logging configuration?
   In Log4J, this is possible with something like `%.10t` to truncate the 
thread name to 10 characters. Please check [the docs for `%` in Log4J pattern 
layouts](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternPercentLiteral).
   With Log4J, you could also replace a string to shorten it. Check ["replace" 
in the patterns part of the 
doc](https://logging.apache.org/log4j/2.x/manual/layouts.html#patterns). 
   For example, `%replace{%t}{pulsar-external-listener-}{pel-}` would replace 
`pulsar-external-listener-` with `pel-`.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client] add externalListenerThreadName parameter [pulsar]

2024-03-06 Thread via GitHub


BewareMyPower commented on PR #22174:
URL: https://github.com/apache/pulsar/pull/22174#issuecomment-1982550657

   > The only thought is that I have is that if I could customize the external 
listener thread name prefix, why couldn't I customize the names of the other 
thread pools? I'd expect to have consistency across the API.
   
   +1 to me. If this new method is merged, should we add some other methods 
like `setXxxThreadName` as well? Would that be "over-configured"? Note: Pulsar 
already has too many configurations that might never be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Aim to rename prefix 'pulsar-external-listener' in threads java client [pulsar]

2024-03-06 Thread via GitHub


lhotari commented on issue #22170:
URL: https://github.com/apache/pulsar/issues/22170#issuecomment-1982537874

   Since the problem for you was logging configuration, isn't there a way to 
limit the length of the thread name in the logging configuration?
   In Log4J, this is possible with something like `%.10t` to truncate the 
thread name to 10 characters. Please check [the docs for `%` in Log4J pattern 
layouts](https://logging.apache.org/log4j/2.x/manual/layouts.html#PatternPercentLiteral).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Producer experienced TimeoutError with direct memory increased [pulsar]

2024-03-06 Thread via GitHub


YanshuoH commented on issue #22212:
URL: https://github.com/apache/pulsar/issues/22212#issuecomment-1982522234

   @Technoboy- I'm sorry, as explained, there is no dump for now.
   
   I've planned to automatically do thread dump when abnormal behavior occurs, 
and I fully understand the difficulty of solving (identifying) this issue 
without the thread dump.
   
   I've posted this to see if anyone have encountered the same problem and 
could share an insight.
   
   > Good guess, I do have same doubts.
   > 
   > As you can see in the image of metadata store executor queue, it is quite 
high.
   > 
   > Unfortunately, it occurs on the production and the incident is quite 
impacting, I have to do a quick reboot without having time to do the dump.
   > 
   > Yet I will look into the code and maybe related metrics to see the 
relevance.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Error from server (BadRequest): container "pulsar-broker" in pod "pulsar-broker-0" is waiting to start: PodInitializing [pulsar]

2024-03-06 Thread via GitHub


GitHub user lhotari added a comment to the discussion: Error from server 
(BadRequest): container "pulsar-broker" in pod "pulsar-broker-0" is waiting to 
start: PodInitializing

There was on reply on Pulsar Slack to check the logs of the init jobs that 
initialize the cluster.
It's not very useful to do any analysis without the logs.

It's useful to check Kubernetes events and logs

Tips related to following k8s events in the console:
```
# events for ALL namespaces (fine for a local minikube or Docker Desktop k8s)
kubectl get events --sort-by=.lastTimestamp -A
# tailing all events
kubectl get events --sort-by=.lastTimestamp -A --watch
```

Getting logs, `pulsar-broker-0` in `pulsar` namespace as example:
```
kubectl -n pulsar logs pulsar-broker-0
# getting previous logs, for the previously crashed pod
kubectl -n pulsar logs pulsar-broker-0 -p
```

In this case, I'd check the init job logs (append `-n pulsar` to the command if 
you have a specific command, check the jobs with `kubectl get jobs` to see if 
they have different names).
```
kubectl logs job/pulsar-pulsar-init
kubectl logs job/pulsar-pulsar-init -p
kubectl logs job/pulsar-bookie-init
kubectl logs job/pulsar-bookie-init -p
```

You could use a k8s UI such as `k9s` on command line or 
[OpenLens](https://github.com/MuhammedKalkan/OpenLens)/[Lens](https://k8slens.dev/)
 for the desktop. That makes things a lot users to begin with.

GitHub link: 
https://github.com/apache/pulsar/discussions/17802#discussioncomment-8702647


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


zhaoyajun2009 commented on code in PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#discussion_r1515583152


##
docs/io-cdc-debezium.md:
##
@@ -80,7 +80,7 @@ You can use one of the following methods to create a 
configuration file.
   tenant: "public"
   namespace: "default"
   name: "debezium-mysql-source"
-  inputs: [ "debezium-mysql-topic" ]
+  topicName: "debezium-mysql-topic"

Review Comment:
   if config this topicName,it create the topic as follows ,this function is 
corect.
   
`persistent://public/default/debezium-mysql-topic33-partition-0`
   the logs is printed as follows:
   
![微信图片_20240307135342](https://github.com/apache/pulsar-site/assets/92070379/d784b675-6ccd-470c-bb10-cf04b6e1966d)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


zhaoyajun2009 commented on PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#issuecomment-1982475252

   > I suppose the PR author is right here: 
https://github.com/apache/pulsar/blob/e2f94dc98dbecb4dc401ba837c54f497ca9d896f/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java#L43
   > 
   > Strangely, this mistake went unnoticed for a long time. It would be good 
if someone else took a look.
   
   
   
   > I suppose the PR author is right here: 
https://github.com/apache/pulsar/blob/e2f94dc98dbecb4dc401ba837c54f497ca9d896f/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SourceConfig.java#L43
   > 
   > Strangely, this mistake went unnoticed for a long time. It would be good 
if someone else took a look.
   
   because the parameter of "inputs" is in the SinkConfig, it is caused by 
copying.  
   
https://github.com/apache/pulsar/blob/e2f94dc98dbecb4dc401ba837c54f497ca9d896f/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java#L49C39-L50C1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


zhaoyajun2009 commented on code in PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#discussion_r1515583152


##
docs/io-cdc-debezium.md:
##
@@ -80,7 +80,7 @@ You can use one of the following methods to create a 
configuration file.
   tenant: "public"
   namespace: "default"
   name: "debezium-mysql-source"
-  inputs: [ "debezium-mysql-topic" ]
+  topicName: "debezium-mysql-topic"

Review Comment:
   if config this topicName,it create the topic as follow ,this function is 
corect.
   
`persistent://public/default/debezium-mysql-topic33-partition-0`
   the logs is printed as follow:
   
![微信图片_20240307135342](https://github.com/apache/pulsar-site/assets/92070379/d784b675-6ccd-470c-bb10-cf04b6e1966d)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


zhaoyajun2009 commented on code in PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#discussion_r1515539866


##
docs/io-cdc-debezium.md:
##
@@ -80,7 +80,7 @@ You can use one of the following methods to create a 
configuration file.
   tenant: "public"
   namespace: "default"
   name: "debezium-mysql-source"
-  inputs: [ "debezium-mysql-topic" ]
+  topicName: "debezium-mysql-topic"

Review Comment:
   if remove this line,the process of testing is unable to function 
properly,the binlog is not source to pulsar's topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


zhaoyajun2009 commented on code in PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#discussion_r1515583152


##
docs/io-cdc-debezium.md:
##
@@ -80,7 +80,7 @@ You can use one of the following methods to create a 
configuration file.
   tenant: "public"
   namespace: "default"
   name: "debezium-mysql-source"
-  inputs: [ "debezium-mysql-topic" ]
+  topicName: "debezium-mysql-topic"

Review Comment:
   if config this topicName,it create the topic as follow:
   
`persistent://public/default/debezium-mysql-topic33-partition-0`
   the logs is printed as follow:
   
![微信图片_20240307135342](https://github.com/apache/pulsar-site/assets/92070379/d784b675-6ccd-470c-bb10-cf04b6e1966d)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] When messages are consumed normally, backlogsize does not decrease, but increases. The backlog quota exceeded [pulsar]

2024-03-06 Thread via GitHub


GitHub user lhotari added a comment to the discussion: When messages are 
consumed normally, backlogsize does not decrease, but increases. The backlog 
quota exceeded

You seem to be using Pulsar 2.5.0 and 2.6.1 clients. A lot of bugs have been 
fixed since then. I'd recommend upgrading to Pulsar 3.0.2 client which is the 
LTS version client. 
How about your broker? Which version is it? I'd also recommend 3.0.3 for that.
Support has ended for all other than Pulsar 3.0.x and 3.2.x:
https://pulsar.apache.org/contribute/release-policy/#supported-versions


GitHub link: 
https://github.com/apache/pulsar/discussions/22214#discussioncomment-8702572


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


zhaoyajun2009 commented on code in PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#discussion_r1515539866


##
docs/io-cdc-debezium.md:
##
@@ -80,7 +80,7 @@ You can use one of the following methods to create a 
configuration file.
   tenant: "public"
   namespace: "default"
   name: "debezium-mysql-source"
-  inputs: [ "debezium-mysql-topic" ]
+  topicName: "debezium-mysql-topic"

Review Comment:
   if remove this line,the process of testing is unable to function properly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Introduce the last sent position to fix message ordering issues in Key_Shared (PIP-282) [pulsar]

2024-03-06 Thread via GitHub


poorbarcode commented on PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#issuecomment-1982303425

   > Hi @poorbarcode.
   > Thank you for your review. What is the current status?
   
   Sorry, I am busy for other things before, I will done this review soon
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Introduce the last sent position to fix message ordering issues in Key_Shared (PIP-282) [pulsar]

2024-03-06 Thread via GitHub


equanz commented on PR #21953:
URL: https://github.com/apache/pulsar/pull/21953#issuecomment-1982290281

   Hi @poorbarcode.
   Thank you for your review. What is the current status?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) annotated tag v2.10.6-candidate-2 updated (cb65e17ea8a -> b25cb0686a4)

2024-03-06 Thread xiangying
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a change to annotated tag v2.10.6-candidate-2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v2.10.6-candidate-2 was modified! ***

from cb65e17ea8a (commit)
  to b25cb0686a4 (tag)
 tagging cb65e17ea8a789d2b689b7b5af8500215ebfb08b (commit)
 replaces v2.10.6-candidate-1
  by Xiangying Meng
  on Thu Mar 7 11:32:28 2024 +0800

- Log -
Release v2.10.6-candidate-2
-BEGIN PGP SIGNATURE-

iQJJBAABCgAzFiEE+QOgdtmpMzLBpphuPcIiPC7/iBMFAmXpNUwVHHhpYW5neWlu
Z0BhcGFjaGUub3JnAAoJED3CIjwu/4gTMxoP/1XxKIIRBTIw58ntnnp91z4yIwne
5SenUXd0JLPS4CgOOsVTDqYStmQFB3CRDIdFKpOpv+CjL1DB3giP2seWsJ8gvRxZ
fB5TjPbdxecaKj9kY9mCc2SPME5zZfLigpZlxYM387wVHM0koGrqKcvLSFWiFRZt
+O3tGdNV1oPzKcC0HwyYGY4jipqlO+Or6eg5FDbHEOm6eCkYIOhATi/RC+uaFm4i
6CBFEj5ppGIPcneOGGdG+D2N/eOINNOs9/G6qCQuKHqTklFZDPE32+MwBGaDlfV6
MhiQOwcZiWl3nybyAUwS+3/DHSUZdQCuQ49yjTM+kr98UihD3lstWB/HFqWtbJCt
HmmQ5GH1ADSGkdPIne46Lx2jEs8hd9u8n2YEhQPFJ6pcwuWY50+5jFCYlkSJ1Q7l
h5nwjQDTRqGIOx8wg8H4wGLG4xPJFOZD/ljoscZt9KE7TMWdfAzr1aAN9y41hVxK
jrglofKyFhHHJY1mbu722R+kmkjNFaA+B7TTD5shS+k8epAcl9VKulLWoZNrIcw8
Usm9bNg4lNccCrv5b96rAgLYeyrNTSfe6BabWrs5hfaVvMESTAIBTFPkuu99k/cm
kdHyLHM4wwCpVB6Fmr6pgFs9v+Fz1zvPdl1kpezkPVbBaplC1MxPd6Tm37CKqLMm
quB9D9mGCSg3qNNE
=xk04
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(pulsar) branch master updated (e84516f4e7a -> 95a53f3a033)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from e84516f4e7a [fix][client] GenericProtobufNativeSchema not implement 
getNativeSchema method. (#22204)
 add 95a53f3a033 [fix][client] fix Reader.hasMessageAvailable might return 
true after seeking to latest (#22201)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  27 ++
 .../apache/pulsar/client/impl/ConsumerImpl.java| 104 ++---
 .../pulsar/client/impl/ConsumerImplTest.java   |   2 +-
 3 files changed, 96 insertions(+), 37 deletions(-)



Re: [PR] [fix][client] fix Reader.hasMessageAvailable might return true after seeking to latest [pulsar]

2024-03-06 Thread via GitHub


codelipenghui merged PR #22201:
URL: https://github.com/apache/pulsar/pull/22201


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r67766 [1/2] - in /dev/pulsar/pulsar-3.2.1-candidate-2: ./ connectors/

2024-03-06 Thread technoboy
Author: technoboy
Date: Thu Mar  7 03:23:59 2024
New Revision: 67766

Log:
Staging artifacts and signature for Pulsar release 3.2.1

Added:
dev/pulsar/pulsar-3.2.1-candidate-2/
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-3.2.1-bin.tar.gz   (with 
props)
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-3.2.1-bin.tar.gz.asc
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-3.2.1-bin.tar.gz.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-3.2.1-src.tar.gz   (with 
props)
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-3.2.1-src.tar.gz.asc
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-3.2.1-src.tar.gz.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-offloaders-3.2.1-bin.tar.gz   
(with props)

dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-offloaders-3.2.1-bin.tar.gz.asc

dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-offloaders-3.2.1-bin.tar.gz.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-shell-3.2.1-bin.tar.gz   
(with props)
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-shell-3.2.1-bin.tar.gz.asc

dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-shell-3.2.1-bin.tar.gz.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-shell-3.2.1-bin.zip   
(with props)
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-shell-3.2.1-bin.zip.asc
dev/pulsar/pulsar-3.2.1-candidate-2/apache-pulsar-shell-3.2.1-bin.zip.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/LICENSE
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/README

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-aerospike-3.2.1.nar   
(with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-aerospike-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-aerospike-3.2.1.nar.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-alluxio-3.2.1.nar  
 (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-alluxio-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-alluxio-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-batch-data-generator-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-batch-data-generator-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-batch-data-generator-3.2.1.nar.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-canal-3.2.1.nar   
(with props)
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-canal-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-canal-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-cassandra-3.2.1.nar   
(with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-cassandra-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-cassandra-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-data-generator-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-data-generator-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-data-generator-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mongodb-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mongodb-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mongodb-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mssql-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mssql-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mssql-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mysql-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mysql-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-mysql-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-oracle-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-oracle-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-oracle-3.2.1.nar.sha512

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-postgres-3.2.1.nar
   (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-postgres-3.2.1.nar.asc

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-debezium-postgres-3.2.1.nar.sha512
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-dynamodb-3.2.1.nar 
  (with props)

dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-dynamodb-3.2.1.nar.asc


svn commit: r67766 [2/2] - in /dev/pulsar/pulsar-3.2.1-candidate-2: ./ connectors/

2024-03-06 Thread technoboy
Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar.asc
==
--- 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar.asc
 (added)
+++ 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar.asc
 Thu Mar  7 03:23:59 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEStnMRiSmOggeXFwiZb2iVNVk0t8FAmXpLOgACgkQZb2iVNVk
+0t8o9w/+Ijp2h2qqNH7T2jbGrWkjbowKgxcqxURrRLeL1n1ZyiUoP8djHJAQHGZ/
+UAYU20PzEX1SqcOicdh/Gp5M9lx11KfncPKPXW1co97TWyVpO24mjIF8ejX9TmGx
+cWIhRUhqbHkmQJ76CXIORGHStZFrwEvEXK4WfiuoWi1qzEjjX96j/8CozImyobB5
+cBpS7t/yeq50IWO2Dz9kWpjvTtJTjo8nNlzBYJX4jqmgtjEMldCaJFIGhEG89k8i
+ukCqePqUSVi+nf6dbGUc8MMbKgSFdHdTjpExzVjJavH6MPDHBphkejpH2dvHnuaT
+WGHTVt8Fan4mpGIV4WPCjU4K/N1bFU83PyGF8hXRK9+dJzlXldiE0XKIZgu0ZV8c
+s6mv6Hb6VhVZ3WevSHtqoz3wLGo27NIVxQHcNId8khjJu/jfYoOaPo2RBXOgiP2H
+vH7tJEpiOxSJ0N22uggRsVCT2y+rFAzAG0oiWtKqaKzUq9LbBfeu2ZdnC4lvQ3cl
+j2beaRgFDAiZCs4KLhi/KTrVNJ8L+jZrL3NXm9C8xBhiLILhMeKwmn7I1BEno7di
+bdnm+kjBSXECtrLGUlSesP4w0Lae7s92ydaTgzl5MgO9oZehOXQT/hWXlBZrUjCX
+sieALfTsABq3fVy5g0fQkYKqbYHvBAs2mj+kQ3u+XJN236OoSQo=
+=78EB
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar.sha512
==
--- 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-jdbc-sqlite-3.2.1.nar.sha512
 Thu Mar  7 03:23:59 2024
@@ -0,0 +1 @@
+ebb80b8c492e8c81ec57bcb75aeb16539832a57aa5f9520d70b7f8b28d9c668596d955d638e620306bbf523a3dd1430ab2cca8f9a086208d9f4ddcf95193c2c1
  ./connectors/pulsar-io-jdbc-sqlite-3.2.1.nar

Added: dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar.asc
==
--- 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar.asc 
Thu Mar  7 03:23:59 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEStnMRiSmOggeXFwiZb2iVNVk0t8FAmXpLOIACgkQZb2iVNVk
+0t8q9RAAkmXTfzAZl1a8vuO3mGvwdUcjq++oKI7rrc+DKREqGllmzSfSrO+IMbJB
+BjAL55QYb4mMGMAwptyhq3fkHl3f6T4iXJ/BmbmByxfrcoUe2NFVQakCQmTqJdtC
+oXBhaFjrCYrYATL0WM2TAZF4NDWtET8ZRM9zO92S0JYi2/hnWS4C6EWWDAV/Qo+a
+yLAMVlPTxUjDTanbz5uMnaNtg81GFPiMWMFSlGQEqvRKvJGN5fue84CcGj3z4H14
+VkVnVAfOy1yLyuzoFtQGIz72dJhkv9QutBh+tGwR+VM9NGbffxv2mqE0FR4H5qRq
+LJDALcXOupfDmTaomRqePmr6PM46+maDDNDwR5mAgsPtsKRrJ1CU5HPQhlAXyOpp
+q3uRvVjQXpXR+zLoJ1/Z5jhSstJ7Tr3CQtm3TZ6yGImYKrUrAO/qKQe+bO0Rf2A7
+bpvwOfVq+mK9hzgvMtLYz+q0KKz0oLpSaBBlLZTuOVK44TJRMgqU7eXVchD49Few
+cl6ygrlbnnugp7YFDeEMGI1UeluaVrbS3XZK1kevyurJzB6zaTXYTj3duKYXPC6z
+m8YNMJQoi0qX2BiySKMQf4HGZ1LdCNsgFXkJVKC6k2/H4nEi7P8OkP2V7+Q4FJMH
+ehrfmjVZQlwwx4N9U1nbDNR63AQ7zKac5rGw+ibeiSas5RHrCMc=
+=0FNs
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar.sha512
==
--- 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar.sha512 
(added)
+++ 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-3.2.1.nar.sha512 
Thu Mar  7 03:23:59 2024
@@ -0,0 +1 @@
+54895437b5845cdf37b91a723fa28e03bc61dcd19e033f3d275232e9527bf5028da50338e84c372e5455bc323031a3292ce9423f20c32e94f9840ed731010728
  ./connectors/pulsar-io-kafka-3.2.1.nar

Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-connect-adaptor-3.2.1.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-connect-adaptor-3.2.1.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.2.1-candidate-2/connectors/pulsar-io-kafka-connect-adaptor-3.2.1.nar.asc

Re: [I] [Bug] Producer experienced TimeoutError with direct memory increased [pulsar]

2024-03-06 Thread via GitHub


Technoboy- commented on issue #22212:
URL: https://github.com/apache/pulsar/issues/22212#issuecomment-1982258096

   Could you share the dump?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 04/04: [improve][broker] Consistently add fine-grain authorization to REST API (#22202)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cb65e17ea8a789d2b689b7b5af8500215ebfb08b
Author: Qiang Zhao 
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

[improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)

(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 187 +--
 .../broker/admin/TopicPoliciesAuthZTest.java   | 206 -
 2 files changed, 331 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e2cfc86e8fd..4dd4c1310cc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -68,6 +68,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -347,7 +348,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
 .thenApply(asyncResponse::resume)
 .exceptionally(ex -> {
@@ -370,7 +372,9 @@ public class PersistentTopics extends PersistentTopicsBase {
 @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
 @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperation(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE);
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -392,7 +396,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -416,7 +421,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
 .thenApply(asyncResponse::resume).exceptionally(ex -> {
 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -440,7 +446,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Max unacked messages on consumer policies for 
the specified topic")
 Integer maxUnackedNum) {
 validateTopicName(tenant, namespace, encodedTopic);
-

(pulsar) 03/04: [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a080aebe015f18a1cbf94be50a57105bd096cade
Author: fengyubiao 
AuthorDate: Sun Feb 18 15:46:52 2024 +0800

[improve] [broker] Do not print an Error log when responding to `HTTP-404` 
when calling `Admin API` and the topic does not exist. (#21995)

(cherry picked from commit 48b4481969cb6028186a7a84b8be8af90770674b)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 11 +++
 .../broker/admin/impl/PersistentTopicsBase.java| 87 +++---
 .../broker/admin/v2/NonPersistentTopics.java   |  4 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 14 ++--
 .../pulsar/broker/admin/v3/Transactions.java   | 16 
 5 files changed, 80 insertions(+), 52 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 30526a0787d..f64e1d94507 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -798,6 +798,17 @@ public abstract class AdminResource extends 
PulsarWebResource {
 == Status.TEMPORARY_REDIRECT.getStatusCode();
 }
 
+protected static boolean isNotFoundException(Throwable ex) {
+Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+return realCause instanceof WebApplicationException
+&& ((WebApplicationException) 
realCause).getResponse().getStatus()
+== Status.NOT_FOUND.getStatusCode();
+}
+
+protected static boolean isNot307And404Exception(Throwable ex) {
+return !isRedirectException(ex) && !isNotFoundException(ex);
+}
+
 protected static String getTopicNotFoundErrorMessage(String topic) {
 return String.format("Topic %s not found", topic);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ed3896b1456..82711096701 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -838,7 +838,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we 
need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned 
metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -848,7 +848,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace 
ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -1058,7 +1058,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex);
 }
 resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1080,7 +1080,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload tc {},{}", 
clientAppId(),
 topicName.getPartitionIndex(), ex);
 }
@@ -1206,7 +1206,7 @@ public class PersistentTopicsBase extends AdminResource {
 }
 }).exceptionally(ex -> {
 // If the exception is not redirect exception we 
need to log it.
-if (!isRedirectException(ex)) {
+if 

(pulsar) branch branch-2.10 updated (9c29b76ff2b -> cb65e17ea8a)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 9c29b76ff2b Release 2.10.6
 new 858f04edce2 [improve][broker] Avoid print redirect exception log when 
get list from bundle (#20846)
 new 10cbcc01547 [improve][admin]internalGetMessageById shouldn't be 
allowed on partitioned topic (#19013)
 new a080aebe015 [improve] [broker] Do not print an Error log when 
responding to `HTTP-404` when calling `Admin API` and the topic does not exist. 
(#21995)
 new cb65e17ea8a [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |  11 ++
 .../broker/admin/impl/PersistentTopicsBase.java| 194 +-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  20 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  16 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 219 ++---
 .../pulsar/broker/admin/v3/Transactions.java   |  16 ++
 .../broker/admin/TopicPoliciesAuthZTest.java   | 206 ++-
 7 files changed, 496 insertions(+), 186 deletions(-)



(pulsar) 02/04: [improve][admin]internalGetMessageById shouldn't be allowed on partitioned topic (#19013)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 10cbcc01547ebac74c38150dc9e07b9e7c226104
Author: gaozhangmin 
AuthorDate: Mon Jan 9 19:39:09 2023 +0800

[improve][admin]internalGetMessageById shouldn't be allowed on partitioned 
topic (#19013)

Co-authored-by: gavingaozhangmin 
(cherry picked from commit b05fddb1af03456438f27217dc6979be00fac19e)
---
 .../broker/admin/impl/PersistentTopicsBase.java| 107 +++--
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  20 ++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  20 ++--
 3 files changed, 80 insertions(+), 67 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 867f45a5e3c..ed3896b1456 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2619,60 +2619,65 @@ public class PersistentTopicsBase extends AdminResource 
{
 return seekPosition;
 }
 
-protected void internalGetMessageById(AsyncResponse asyncResponse, long 
ledgerId, long entryId,
-  boolean authoritative) {
-// will redirect if the topic not owned by current broker
-validateTopicOwnershipAsync(topicName, authoritative)
-.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
-.thenCompose(__ -> {
-CompletableFuture ret;
-if (topicName.isGlobal()) {
-ret = 
validateGlobalNamespaceOwnershipAsync(namespaceName);
-} else {
-ret = CompletableFuture.completedFuture(null);
-}
-return ret;
-})
-.thenCompose(__ -> getTopicReferenceAsync(topicName))
-.thenAccept(topic -> {
-ManagedLedgerImpl ledger =
-(ManagedLedgerImpl) ((PersistentTopic) 
topic).getManagedLedger();
-ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
-new AsyncCallbacks.ReadEntryCallback() {
-@Override
-public void 
readEntryFailed(ManagedLedgerException exception,
-Object ctx) {
-asyncResponse.resume(new 
RestException(exception));
-}
+protected CompletableFuture internalGetMessageById(long 
ledgerId, long entryId, boolean authoritative) {
+CompletableFuture future;
+if (topicName.isGlobal()) {
+future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+} else {
+future = CompletableFuture.completedFuture(null);
+}
+return future.thenCompose(__ -> {
+if (topicName.isPartitioned()) {
+return CompletableFuture.completedFuture(null);
+} else {
+return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+.thenAccept(topicMetadata -> {
+if (topicMetadata.partitions > 0) {
+log.warn("[{}] Not supported getMessageById 
operation on partitioned-topic {}",
+clientAppId(), topicName);
+throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+"GetMessageById is not allowed on 
partitioned-topic");
+}
+});
 
-@Override
-public void readEntryComplete(Entry entry, 
Object ctx) {
-try {
-
asyncResponse.resume(generateResponseWithEntry(entry));
-} catch (IOException exception) {
-asyncResponse.resume(new 
RestException(exception));
-} finally {
-if (entry != null) {
-entry.release();
-}
-}
-}
+}
+})
+.thenCompose(ignore -> validateTopicOwnershipAsync(topicName, 
authoritative))
+.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+.thenCompose(__ 

(pulsar) 01/04: [improve][broker] Avoid print redirect exception log when get list from bundle (#20846)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 858f04edce29569dae9d5a10f2c0b6ce2fdca79d
Author: Kai Wang 
AuthorDate: Sun Jul 23 12:56:04 2023 +0800

[improve][broker] Avoid print redirect exception log when get list from 
bundle (#20846)

(cherry picked from commit 9256407cdca1ab6d9b3a59ce404dccb09953ab24)
---
 .../pulsar/broker/admin/v2/NonPersistentTopics.java  | 16 
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 87cc6650bb9..854f76c3abf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -498,19 +498,19 @@ public class NonPersistentTopics extends PersistentTopics 
{
 }
 asyncResponse.resume(topicList);
 } catch (Exception e) {
-log.error("[{}] Failed to list topics on namespace bundle 
{}/{}", clientAppId(),
-namespaceName, bundleRange, e);
+if (!isRedirectException(e)) {
+log.error("[{}] Failed to list topics on namespace 
bundle {}/{}", clientAppId(),
+namespaceName, bundleRange, e);
+}
 asyncResponse.resume(new RestException(e));
 }
 }
 }).exceptionally(ex -> {
-log.error("[{}] Failed to list topics on namespace bundle {}/{}", 
clientAppId(),
-namespaceName, bundleRange, ex);
-if (ex.getCause() instanceof WebApplicationException) {
-asyncResponse.resume(ex.getCause());
-} else {
-asyncResponse.resume(new RestException(ex.getCause()));
+if (!isRedirectException(ex)) {
+log.error("[{}] Failed to list topics on namespace bundle 
{}/{}", clientAppId(),
+namespaceName, bundleRange, ex);
 }
+resumeAsyncResponseExceptionally(asyncResponse, ex);
 return null;
 });
 }



svn commit: r67764 - /dev/pulsar/pulsar-3.1.3-candidate-1/

2024-03-06 Thread technoboy
Author: technoboy
Date: Thu Mar  7 02:42:18 2024
New Revision: 67764

Log:
delete

Removed:
dev/pulsar/pulsar-3.1.3-candidate-1/



svn commit: r67763 - /dev/pulsar/pulsar-2.10.6-candidate-1/

2024-03-06 Thread technoboy
Author: technoboy
Date: Thu Mar  7 02:41:55 2024
New Revision: 67763

Log:
delete

Removed:
dev/pulsar/pulsar-2.10.6-candidate-1/



Re: [I] [Bug] Brokers yield "Error Starting up in Worker" when functions are enabled [pulsar]

2024-03-06 Thread via GitHub


primmayora commented on issue #21748:
URL: https://github.com/apache/pulsar/issues/21748#issuecomment-1982223679

   I face the same issue on the production environment on GKE without autopilot 
the autoscaling did a scale-down but it did not come back online and had same 
logs errors you shared, i tried to reproduce it on a controlled environment but 
no luck.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r67762 - /dev/pulsar/pulsar-3.2.1-candidate-1/

2024-03-06 Thread technoboy
Author: technoboy
Date: Thu Mar  7 02:14:39 2024
New Revision: 67762

Log:
delete

Removed:
dev/pulsar/pulsar-3.2.1-candidate-1/



(pulsar) annotated tag v3.2.1-candidate-2 updated (da98b8d8d0b -> 1f4bac0c773)

2024-03-06 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to annotated tag v3.2.1-candidate-2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v3.2.1-candidate-2 was modified! ***

from da98b8d8d0b (commit)
  to 1f4bac0c773 (tag)
 tagging da98b8d8d0bc0c2bdd2d14d3e2e8058fb4c5dfd2 (commit)
 replaces v3.2.1-candidate-1
  by Jiwe Guo
  on Thu Mar 7 10:03:26 2024 +0800

- Log -
Release v3.2.1-candidate-2
-BEGIN PGP SIGNATURE-

iQJJBAABCAAzFiEEStnMRiSmOggeXFwiZb2iVNVk0t8FAmXpIG4VHHRlY2hub2Jv
eUBhcGFjaGUub3JnAAoJEGW9olTVZNLfIPMP/0jYIVzUUctfqEDSDhlZtrRmVCFO
sm1KsY+HwTAdpYk3RmxZGBK/P7NPQF2SHyJd3eow5rAHN2kH6D5ZgasrSumNIgsc
oZ2QnEgOKzp80tzgH5X07gVN1dsssqmzE23TrWGIKf9rhCFX9cTcI8NOODjjjIex
6V7RpJDRlMXb9rEpHlLNqDyaKhKe0dS10IGqslBbCozdkfc7xXiwT8mutIyCmEAP
sj3c1zqjWGcxrNZ+mWubtdnjTlIDUnfY6t76tX1QI9f4G95nrNDHXyTJqoicP7RR
LvLRkN8f2QLeXHQZr24xM3evHmqaua08n3ATCaPcx6I4mU15XLO0Z436r9WVSu5d
HBSlCHqa1TEUGaXXF4zQa3BdmVSepeD9IBNSPb0aApF/9UqLXjmJnBpY4+3Ty8Vd
uq+TV+pfc6KpaWNRbYtbxXfu+3iqvySRMxFsmEWxFgzMOcsxxKVIo4rl6JIbDffB
3sBtm7GoAy6uVZBVYgz580IwkQQPveJTWDGt5hTwcRyiMoWTq/69E1MPHRJtQkmZ
Y/BsXJh9SM/vx4tyttDJTD8qM0FM7Ez/2lsl7U2ndawmwvTkrZAC3DGYyWcMwaxU
pSUoQ6EKfF17VQbJUAvCxUvIcCJrtrhdvDGUDmR5PHRpzBRTBjdW2AvIB+FuaHqQ
Y9sswX+EalGBBOAH
=J1DS
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(pulsar) branch branch-3.0 updated: [fix] [broker][branch-3.0] Expire messages according to ledger close time to avoid client clock skew (#21940) (#22211)

2024-03-06 Thread heesung
This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 16ea0d88ee5 [fix] [broker][branch-3.0] Expire messages according to 
ledger close time to avoid client clock skew (#21940) (#22211)
16ea0d88ee5 is described below

commit 16ea0d88ee50dc9267c2da5a31e0e8b4f0d958d0
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Wed Mar 6 17:53:05 2024 -0800

[fix] [broker][branch-3.0] Expire messages according to ledger close time 
to avoid client clock skew (#21940) (#22211)

Co-authored-by: feynmanlin <315157...@qq.com>
---
 .../persistent/PersistentMessageExpiryMonitor.java | 35 +-
 .../service/PersistentMessageFinderTest.java   | 25 +++-
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 41bc6098e1a..d82f3d93f8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -30,8 +31,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
 import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.protocol.Commands;
@@ -78,7 +81,9 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
 if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) 
{
 log.info("[{}][{}] Starting message expiry check, ttl= {} 
seconds", topicName, subName,
 messageTTLInSeconds);
-
+// First filter the entire Ledger reached TTL based on the Ledger 
closing time to avoid client clock skew
+checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
+// Some part of entries in active Ledger may have reached TTL, so 
we need to continue searching.
 
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
 entry -> {
 try {
 long entryTimestamp = 
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -99,6 +104,34 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback {
 return false;
 }
 }
+private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int 
messageTTLInSeconds) {
+if (messageTTLInSeconds <= 0) {
+return;
+}
+if (cursor instanceof ManagedCursorImpl managedCursor) {
+ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
managedCursor.getManagedLedger();
+Position deletedPosition = managedCursor.getMarkDeletedPosition();
+SortedMap 
ledgerInfoSortedMap =
+
managedLedger.getLedgersInfo().subMap(deletedPosition.getLedgerId(), true,
+managedLedger.getLedgersInfo().lastKey(), true);
+MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
+for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
+if (!ledgerInfo.hasTimestamp() || 
!MessageImpl.isEntryExpired(messageTTLInSeconds,
+ledgerInfo.getTimestamp())) {
+break;
+}
+info = ledgerInfo;
+}
+if (info != null && info.getLedgerId() > -1) {
+PositionImpl position = PositionImpl.get(info.getLedgerId(), 
info.getEntries() - 1);
+if (((PositionImpl) 
managedLedger.getLastConfirmedEntry()).compareTo(position) < 0) {
+

Re: [PR] [fix] [broker][branch-3.0] Expire messages according to ledger close time to avoid client clock skew (#21940) [pulsar]

2024-03-06 Thread via GitHub


heesung-sn merged PR #22211:
URL: https://github.com/apache/pulsar/pull/22211


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Producer experienced TimeoutError with direct memory increased [pulsar]

2024-03-06 Thread via GitHub


YanshuoH commented on issue #22212:
URL: https://github.com/apache/pulsar/issues/22212#issuecomment-1982191269

   Good guess, I do have same doubts. 
   
   As you can see in the image of metadata store executor queue, it is quite 
high.
   
   Unfortunately, it occurs on the production and the incident is quite 
impacting, I have to do a quick reboot without having time to do the dump.
   
   Yet I will look into the code and maybe related metrics to see the relevance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][fn][branch-3.0] Add missing "exception" argument to some `log.error` (#22140) [pulsar]

2024-03-06 Thread via GitHub


heesung-sn merged PR #22213:
URL: https://github.com/apache/pulsar/pull/22213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.0 updated: [improve][fn][branch-3.0] Add missing "exception" argument to some `log.error` (#22140) (#22213)

2024-03-06 Thread heesung
This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new dc035f51ef0 [improve][fn][branch-3.0] Add missing "exception" argument 
to some `log.error` (#22140) (#22213)
dc035f51ef0 is described below

commit dc035f51ef0cc03f04e0c6d97ef2e41512b12717
Author: jiangpengcheng 
AuthorDate: Thu Mar 7 09:33:22 2024 +0800

[improve][fn][branch-3.0] Add missing "exception" argument to some 
`log.error` (#22140) (#22213)
---
 .../java/org/apache/pulsar/functions/worker/PulsarWorkerService.java  | 2 +-
 .../org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java| 2 +-
 .../org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java| 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
index 84b943e5671..255ed5f3218 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/PulsarWorkerService.java
@@ -224,7 +224,7 @@ public class PulsarWorkerService implements WorkerService {
 log.warn("Retry to connect to Pulsar service at {}", 
workerConfig.getPulsarWebServiceUrl());
 if (retries >= maxRetries) {
 log.error("Failed to connect to Pulsar service at {} after 
{} attempts",
-workerConfig.getPulsarFunctionsNamespace(), 
maxRetries);
+workerConfig.getPulsarFunctionsNamespace(), 
maxRetries, e);
 throw e;
 }
 retries++;
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 6d07e587091..ea5517e0fd4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1080,7 +1080,7 @@ public abstract class ComponentImpl implements 
Component {
 try {
 
worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite);
 } catch (PulsarAdminException e) {
-log.error("Function in trigger function is not ready @ /{}/{}/{}", 
tenant, namespace, functionName);
+log.error("Function in trigger function is not ready @ /{}/{}/{}", 
tenant, namespace, functionName, e);
 throw new RestException(Status.BAD_REQUEST, "Function in trigger 
function is not ready");
 }
 String outputTopic = 
functionMetaData.getFunctionDetails().getSink().getTopic();
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index a075d3e18a0..4cbd7c8cbcb 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -169,7 +169,7 @@ public class FunctionsImpl extends ComponentImpl implements 
Functions

(pulsar) 02/02: [improve][broker] Consistently add fine-grain authorization to REST API (#22202)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2316bc0a48bf6fe5a1b0268559303a25da50c39
Author: Qiang Zhao 
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

[improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)

(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 232 ++--
 .../broker/admin/TopicPoliciesAuthZTest.java   | 309 +
 2 files changed, 461 insertions(+), 80 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 69d479976a2..09e94857ce6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import 
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -356,7 +357,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
 .thenApply(asyncResponse::resume)
 .exceptionally(ex -> {
@@ -379,7 +381,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
 @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -401,7 +404,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -425,7 +429,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
 .thenApply(asyncResponse::resume).exceptionally(ex -> {
 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -449,7 +454,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Max unacked messages on consumer policies for 
the specified topic")
 Integer maxUnackedNum) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+

Re: [PR] [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. [pulsar]

2024-03-06 Thread via GitHub


codelipenghui commented on PR #21995:
URL: https://github.com/apache/pulsar/pull/21995#issuecomment-1982163083

   @heesung-sn I have cherry-picked to branch-3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 01/02: [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d9be0267a2130d4430d8f5b9d1d2d8d6d4609d6a
Author: fengyubiao 
AuthorDate: Sun Feb 18 15:46:52 2024 +0800

[improve] [broker] Do not print an Error log when responding to `HTTP-404` 
when calling `Admin API` and the topic does not exist. (#21995)

(cherry picked from commit 48b4481969cb6028186a7a84b8be8af90770674b)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  4 +
 .../broker/admin/impl/PersistentTopicsBase.java| 88 +++---
 .../broker/admin/impl/SchemasResourceBase.java |  2 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 36 -
 .../pulsar/broker/admin/v3/Transactions.java   | 10 +--
 6 files changed, 74 insertions(+), 72 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 4190b4c486a..e5806b7bec2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -830,6 +830,10 @@ public abstract class AdminResource extends 
PulsarWebResource {
 == Status.NOT_FOUND.getStatusCode();
 }
 
+protected static boolean isNot307And404Exception(Throwable ex) {
+return !isRedirectException(ex) && !isNotFoundException(ex);
+}
+
 protected static String getTopicNotFoundErrorMessage(String topic) {
 return String.format("Topic %s not found", topic);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 62493fdafa9..0c523726335 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -965,7 +965,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we 
need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned 
metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -975,7 +975,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace 
ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -1143,7 +1143,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex);
 }
 resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -1165,7 +1165,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload tc {},{}", 
clientAppId(),
 topicName.getPartitionIndex(), ex);
 }
@@ -1267,7 +1267,7 @@ public class PersistentTopicsBase extends AdminResource {
 }
 }).exceptionally(ex -> {
 // If the exception is not redirect exception we 
need to log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to get partitioned 
topic metadata while get"
 + " subscriptions for topic {}", 
clientAppId(), topicName, ex);
 }
@@ -1277,7 +1277,7 @@ public class 

(pulsar) branch branch-3.0 updated (140ca5e34ae -> e2316bc0a48)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 140ca5e34ae [fix] [branch-3.0] Fix reader stuck when read from 
compacted topic with read compact mode disable (#22199)
 new d9be0267a21 [improve] [broker] Do not print an Error log when 
responding to `HTTP-404` when calling `Admin API` and the topic does not exist. 
(#21995)
 new e2316bc0a48 [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |   4 +
 .../broker/admin/impl/PersistentTopicsBase.java|  88 +++---
 .../broker/admin/impl/SchemasResourceBase.java |   2 +-
 .../broker/admin/v2/NonPersistentTopics.java   |   6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 266 +++---
 .../pulsar/broker/admin/v3/Transactions.java   |  10 +-
 .../broker/admin/TopicPoliciesAuthZTest.java   | 309 +
 7 files changed, 534 insertions(+), 151 deletions(-)



Re: [PR] [improve][fn] Add missing "exception" argument to some `log.error` (#22140) [pulsar]

2024-03-06 Thread via GitHub


heesung-sn closed pull request #22213: [improve][fn] Add missing "exception" 
argument to some `log.error` (#22140)
URL: https://github.com/apache/pulsar/pull/22213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.1 updated: [improve][broker] Consistently add fine-grain authorization to REST API (#22202)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new dcaf508f8e3 [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)
dcaf508f8e3 is described below

commit dcaf508f8e381107125e98722f4ddab76f9303ad
Author: Qiang Zhao 
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

[improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)

(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 233 +--
 .../broker/admin/TopicPoliciesAuthZTest.java   | 312 -
 2 files changed, 462 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1fc46f9c872..94fb1f53ac7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import 
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -356,7 +357,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
 .thenApply(asyncResponse::resume)
 .exceptionally(ex -> {
@@ -379,7 +381,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
 @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -401,7 +404,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -425,7 +429,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
 .thenApply(asyncResponse::resume).exceptionally(ex -> {
 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -449,7 +454,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Max unacked messages on consumer policies for 
the 

Re: [PR] [improve][fn] Add missing "exception" argument to some `log.error` (#22140) [pulsar]

2024-03-06 Thread via GitHub


github-actions[bot] commented on PR #22213:
URL: https://github.com/apache/pulsar/pull/22213#issuecomment-1982136874

   @jiangpengcheng Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][fn] Add missing "exception" argument to some `log.error` (#22140) [pulsar]

2024-03-06 Thread via GitHub


jiangpengcheng opened a new pull request, #22213:
URL: https://github.com/apache/pulsar/pull/22213

   cherry-pick PR #22140
   cherry picked from commit c7cedc6828e08765e6a14335bb5a00cf23f0cfa3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [improve][broker] Consistently add fine-grain authorization to REST API (#22202)

2024-03-06 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new da98b8d8d0b [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)
da98b8d8d0b is described below

commit da98b8d8d0bc0c2bdd2d14d3e2e8058fb4c5dfd2
Author: Qiang Zhao 
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

[improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)

(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 233 +--
 .../broker/admin/TopicPoliciesAuthZTest.java   | 312 -
 2 files changed, 462 insertions(+), 83 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9094a4642a0..0cb9bc62c61 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import 
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -358,7 +359,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
 .thenApply(asyncResponse::resume)
 .exceptionally(ex -> {
@@ -381,7 +383,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
 @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
 .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
@@ -404,7 +407,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -428,7 +432,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
 .thenApply(asyncResponse::resume).exceptionally(ex -> {
 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -452,7 +457,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Max unacked 

Re: [PR] The sample configuration of io-debezium has an error of parameter. [pulsar-site]

2024-03-06 Thread via GitHub


shibd commented on code in PR #825:
URL: https://github.com/apache/pulsar-site/pull/825#discussion_r1515310543


##
docs/io-cdc-debezium.md:
##
@@ -80,7 +80,7 @@ You can use one of the following methods to create a 
configuration file.
   tenant: "public"
   namespace: "default"
   name: "debezium-mysql-source"
-  inputs: [ "debezium-mysql-topic" ]
+  topicName: "debezium-mysql-topic"

Review Comment:
   In fact, the CDC Debezium Source connector does not use this configuration, 
but sends data to the pulsar topic according to this rule.
   
   ```
   The topic naming rule is: "{{database.server.name}}.{{table.name}}". For 
examples: "public/default/mydbserver.public.io-test".You can also choose to use 
a variety of other tools to create a connector:
   ```
   
   We should remove this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Producer experienced TimeoutError with direct memory increased [pulsar]

2024-03-06 Thread via GitHub


dao-jun commented on issue #22212:
URL: https://github.com/apache/pulsar/issues/22212#issuecomment-1982073543

   just guessing:
   Is it possible that the ServerCnx EventLoop  is blocked?
   If you have the thread dump, you can check it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] Producer experienced TimeoutError with direct memory increased [pulsar]

2024-03-06 Thread via GitHub


YanshuoH opened a new issue, #22212:
URL: https://github.com/apache/pulsar/issues/22212

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   Pulsar broker (and other components): v3.2.0
   Pulsar client: go sdk v0.8.1 / v0.9.0
   
   ### Minimal reproduce step
   
   As the incident happened in production environment, I've tried to simulate 
the conditions with the ideas of:
   1. bookie slow response
   2. broker pending requests high
   
   But unfortunately the increasing direct memory cannot be reproduced:
   1. set broker config of `maxPendingPublishRequestsPerConnection` to 1
   3. set client `OperationTimeout` and other timeout related parameters to a 
low value (like 100ms)
   4. create a producer with high load and produce messages to 
`persistent://public/default/t1`
   5. to simulate bookie slow response, either:
 1. unload the topic
 2. kill the bookie in charge of topic
   
   Then the two kinds of error occurred:
   
   1. send operation: `2024/03/06 08:12:33 sendMessage failed: producer.Send: 
message send timeout: TimeoutError`
   2. create new producer operation using the same connection: `ERRO[0012] 
Failed to create producer at send PRODUCER request  error="request timed out" 
topic="persistent://public/default/t1-partition-0"`
   
   ### What did you expect to see?
   
   1. DirectMemory should not have peak increasing
   2. Producer should not encounter TimeoutError
   
   
   ### What did you see instead?
   
   1. DirectMemory kept increasing
   2. Producer send Timeout and new connection failed
   
   https://github.com/apache/pulsar/assets/6973092/4d1b6d4b-58ff-4fb5-88a8-0c97bbb8afae;>
   
   
   ### Anything else?
   
   This issue is rather rare as we have used Pulsar for years and always kept 
the version fresh (from v2.2.x -> v3.2.0) and never have encountered such 
problem.
   
   I have digged into the source code, and for me, the possible DirectMemory 
reason may be that the `ledger.asyncAddEntry` did not complete (the async 
executor callback), so as code comment below, the buffer was not released.
   
   https://github.com/apache/pulsar/assets/6973092/8bebb053-4a88-4a1c-b3e7-b77019e9ee39;>
   
   Here are also some other findings that may help identify the root cause.
   
   1. Throttled connection
   
![f5d43c9b-9220-4a35-adbd-6661a4107026](https://github.com/apache/pulsar/assets/6973092/a29580c1-fbef-44a5-87df-c4945bee305f)
   
   The code that in charge of toggling `pulsar_broker_throttled_connections` is 
`org.apache.pulsar.broker.service.ServerCnx#startSendOperation` and 
`org.apache.pulsar.broker.service.ServerCnx#enableCnxAutoRead`
   
   That means during that period of time, `ServerCnx`'s pendingSendRequest 
should have reached the `maxPendingSendRequests` which is `1000` for our 
configuration.
   
   Also I can confirm that no publish rate limit set on any of the topics.
   
   2. Metadata store executor queue
   
![7e954cbf-ab00-4448-a2fd-907fe9a49b90](https://github.com/apache/pulsar/assets/6973092/d0c92ebf-90c3-4c83-9b8a-35d0bfa3ed3d)
   
   While the `disableCnxAutoRead` called, the command `Producer` on the 
connection will not be able to succeed so we can see many of below log:
   
   ```
   2024-03-04T09:29:56,302+ [pulsar-io-3-1] INFO  
org.apache.pulsar.broker.service.ServerCnx - [/10.120.159.82:59748] Closed 
producer before its creation was completed. producerId=61
   ```
   
   Then the producer will try to reconnect, which results to tons of metadata 
store operations in 
`org.apache.pulsar.broker.service.BrokerService#getOrCreateTopic`.
   
   3. For what it's worth, the broker / bookie / zookeeper logs show no 
`Exception` or `error` and their CPU/Memory/JVM (except broker DirectMemory) 
seems fine.
   
   the peaks around 17:40 is the time that I've restarted the brokers on 
question.
   
   https://github.com/apache/pulsar/assets/6973092/317c88a2-c3cc-47d1-a7f2-3ac0971bc1d9;>
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][client] GenericProtobufNativeSchema not implement getNativeSchema method. (#22204)

2024-03-06 Thread daojun
This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e84516f4e7a [fix][client] GenericProtobufNativeSchema not implement 
getNativeSchema method. (#22204)
e84516f4e7a is described below

commit e84516f4e7acf765f827c47c5a83be7a1925c954
Author: Baodi Shi 
AuthorDate: Thu Mar 7 07:28:51 2024 +0800

[fix][client] GenericProtobufNativeSchema not implement getNativeSchema 
method. (#22204)
---
 .../client/impl/schema/generic/GenericProtobufNativeSchema.java   | 6 ++
 .../impl/schema/generic/GenericProtobufNativeReaderTest.java  | 8 
 2 files changed, 14 insertions(+)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java
index 4ae2a21929a..62a36fee351 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeSchema.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema.generic;
 
 import static 
org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericProtobufNativeReader.parseProtobufSchema;
 import com.google.protobuf.Descriptors;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.Field;
@@ -68,6 +69,11 @@ public class GenericProtobufNativeSchema extends 
AbstractGenericSchema {
 return descriptor;
 }
 
+@Override
+public Optional getNativeSchema() {
+return Optional.of(descriptor);
+}
+
 @Override
 public boolean supportSchemaVersioning() {
 return true;
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java
index 4cbb325c82f..c358f30ccae 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericProtobufNativeReaderTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -29,6 +30,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 @Slf4j
 public class GenericProtobufNativeReaderTest {
@@ -79,6 +81,12 @@ public class GenericProtobufNativeReaderTest {
 
assertEquals(nativeRecord.getField(nativeRecord.getDescriptorForType().findFieldByName("doubleField")),
 DOUBLE_FIELD_VLUE);
 }
 
+@Test
+public void testGetNativeSchema() {
+assertTrue(genericProtobufNativeSchema.getNativeSchema().isPresent());
+assertTrue(genericProtobufNativeSchema.getNativeSchema().get() 
instanceof Descriptors.Descriptor);
+}
+
 private static final String STRING_FIELD_VLUE = "stringFieldValue";
 private static final double DOUBLE_FIELD_VLUE = 0.2D;
 



Re: [PR] [fix][client] GenericProtobufNativeSchema not implement getNativeSchema method. [pulsar]

2024-03-06 Thread via GitHub


dao-jun merged PR #22204:
URL: https://github.com/apache/pulsar/pull/22204


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker][branch-3.0] Expire messages according to ledger close time to avoid client clock skew (#21940) [pulsar]

2024-03-06 Thread via GitHub


github-actions[bot] commented on PR #22211:
URL: https://github.com/apache/pulsar/pull/22211#issuecomment-198203

   @heesung-sn Please add the following content to your PR description and 
select a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix] [broker][branch-3.0] Expire messages according to ledger close time to avoid client clock skew (#21940) [pulsar]

2024-03-06 Thread via GitHub


heesung-sn opened a new pull request, #22211:
URL: https://github.com/apache/pulsar/pull/22211

   cherry-pick PR https://github.com/apache/pulsar/pull/21940
   
   cherry picked from commit 861618a8120901a4042e99243d8fa539449d7f60


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][monitor] PIP-223: Add metrics for all rest endpoints. [pulsar]

2024-03-06 Thread via GitHub


dao-jun commented on code in PR #21772:
URL: https://github.com/apache/pulsar/pull/21772#discussion_r1515258281


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java:
##
@@ -21,43 +21,68 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import io.prometheus.client.Counter;
-import io.prometheus.client.Histogram;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
 import java.util.Stack;
+import javax.validation.constraints.NotNull;
 import javax.ws.rs.container.ContainerRequestContext;
 import javax.ws.rs.container.ContainerRequestFilter;
 import javax.ws.rs.container.ContainerResponseContext;
 import javax.ws.rs.container.ContainerResponseFilter;
 import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.glassfish.jersey.server.internal.routing.UriRoutingContext;
 import org.glassfish.jersey.server.model.Resource;
 import org.glassfish.jersey.server.model.ResourceMethod;
-import org.jetbrains.annotations.NotNull;
 
 public class RestEndpointMetricsFilter implements ContainerResponseFilter, 
ContainerRequestFilter {
-private static final LoadingCache CACHE = 
CacheBuilder
+private final LoadingCache CACHE = CacheBuilder
 .newBuilder()
 .maximumSize(100)
+.expireAfterAccess(Duration.ofMinutes(1))
 .build(new CacheLoader<>() {
 @Override
 public @NotNull String load(@NotNull ResourceMethod method) 
throws Exception {
 return getRestPath(method);
 }
 });
 
-private static final Histogram LATENCY = Histogram
-.build("pulsar_broker_rest_endpoint_latency", "-")
-.unit("ms")
-.labelNames("path", "method")
-.buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)
-.register();
-private static final Counter FAILED = Counter
-.build("pulsar_broker_rest_endpoint_failed", "-")
-.labelNames("path", "method", "code")
-.register();
-
 private static final String REQUEST_START_TIME = "requestStartTime";
+private static final AttributeKey PATH = 
AttributeKey.stringKey("path");
+private static final AttributeKey METHOD = 
AttributeKey.stringKey("method");
+private static final AttributeKey CODE = 
AttributeKey.stringKey("code");
+
+private final DoubleHistogram latency;
+private final LongCounter failed;
+
+private RestEndpointMetricsFilter(PulsarService pulsar) {
+PulsarBrokerOpenTelemetry telemetry = pulsar.getOpenTelemetry();
+Meter meter = telemetry.getMeter();
+latency = meter.histogramBuilder("pulsar_broker_rest_endpoint_latency")
+.setDescription("-")
+.setUnit("ms")
+.setExplicitBucketBoundariesAdvice(List.of(10D, 20D, 50D, 
100D, 200D, 500D, 1000D, 2000D))
+.build();
+failed = meter.counterBuilder("pulsar_broker_rest_endpoint_failed")

Review Comment:
   I understand, but if use the histogram to record the failed requests, the 
output metrics body includes `*_bucket` ,
   I think use a counter to record the failed requests, it will save some 
resources(memory, network, tsdb).
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][monitor] PIP-223: Add metrics for all rest endpoints. [pulsar]

2024-03-06 Thread via GitHub


dao-jun commented on code in PR #21772:
URL: https://github.com/apache/pulsar/pull/21772#discussion_r1515255134


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java:
##
@@ -87,6 +115,17 @@ public void filter(ContainerRequestContext req) throws 
IOException {
 req.setProperty(REQUEST_START_TIME, System.currentTimeMillis());
 }
 
+
+private void recordSuccess(String path, String method, long duration) {
+Attributes attributes = Attributes.of(PATH, path, METHOD, method);

Review Comment:
   it makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Consistently add fine-grain authorization to REST API [pulsar]

2024-03-06 Thread via GitHub


heesung-sn commented on PR #22202:
URL: https://github.com/apache/pulsar/pull/22202#issuecomment-1981863784

   @mattisonchao could you help to cherry-pick this PR to 3.0 branch? I see 
conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.0 updated: [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable (#22199)

2024-03-06 Thread heesung
This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 140ca5e34ae [fix] [branch-3.0] Fix reader stuck when read from 
compacted topic with read compact mode disable (#22199)
140ca5e34ae is described below

commit 140ca5e34aeccf5657fb056d1fc6f6011ff6c3a7
Author: thetumbled <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Thu Mar 7 05:37:31 2024 +0800

[fix] [branch-3.0] Fix reader stuck when read from compacted topic with 
read compact mode disable (#22199)
---
 .../apache/pulsar/broker/service/ServerCnx.java| 33 +-
 .../compaction/GetLastMessageIdCompactedTest.java  | 27 ++
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 57081863a14..d40a4ec789f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,7 +2077,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 (PositionImpl) markDeletePosition,
 partitionIndex,
 requestId,
-consumer.getSubscription().getName());
+consumer.getSubscription().getName(),
+consumer.readCompacted());
 }).exceptionally(e -> {
 
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
 ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
@@ -2095,16 +2096,33 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 PositionImpl markDeletePosition,
 int partitionIndex,
 long requestId,
-String subscriptionName) {
-
+String subscriptionName,
+boolean readCompacted) {
 PersistentTopic persistentTopic = (PersistentTopic) topic;
 ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
 
 // If it's not pointing to a valid entry, respond messageId of the 
current position.
 // If the compaction cursor reach the end of the topic, respond 
messageId from compacted ledger
-Optional compactionHorizon = 
persistentTopic.getCompactedTopic().getCompactionHorizon();
-if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
-&& lastPosition.compareTo((PositionImpl) 
compactionHorizon.get()) <= 0)) {
+Optional compactionHorizon = readCompacted
+? persistentTopic.getCompactedTopic().getCompactionHorizon() : 
Optional.empty();
+if (lastPosition.getEntryId() == -1 || 
!ml.ledgerExists(lastPosition.getLedgerId())) {
+// there is no entry in the original topic
+if (compactionHorizon != null && compactionHorizon.isPresent()) {
+// if readCompacted is true, we need to read the last entry 
from compacted topic
+handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
+markDeletePosition);
+return;
+} else {
+// if readCompacted is false, we need to return 
MessageId.earliest
+writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
-1, -1, partitionIndex, -1,
+markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+}
+return;
+}
+
+if (compactionHorizon != null && compactionHorizon.isPresent()
+&& lastPosition.compareTo((PositionImpl) 
compactionHorizon.get()) <= 0) {
 handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, 
partitionIndex,
 markDeletePosition);
 return;
@@ -2133,7 +2151,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
 batchSizeFuture.whenComplete((batchSize, e) -> {
 if (e != null) {
-if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
+if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException
+&& readCompacted) {
 handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
 markDeletePosition);
 } else {
diff --git 

Re: [PR] [fix] [branch-3.0] Fix reader stuck when read from compacted topic with read compact mode disable [pulsar]

2024-03-06 Thread via GitHub


heesung-sn merged PR #22199:
URL: https://github.com/apache/pulsar/pull/22199


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.0 updated: [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) (#22142)

2024-03-06 Thread heesung
This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 5b62d4b221f [fix][broker][branch-3.0] Avoid consumers receiving 
acknowledged messages from compacted topic after reconnection (#21187) (#22142)
5b62d4b221f is described below

commit 5b62d4b221fb0686c93911c13998fe3ad2689689
Author: Cong Zhao 
AuthorDate: Thu Mar 7 05:36:36 2024 +0800

[fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages 
from compacted topic after reconnection (#21187) (#22142)
---
 .../apache/bookkeeper/mledger/ManagedCursor.java   |   4 +
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  10 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  12 +-
 .../broker/service/persistent/PersistentTopic.java |   5 +-
 .../pulsar/compaction/CompactedTopicImpl.java  |   6 +-
 .../broker/service/ReplicatorSubscriptionTest.java |   2 +
 .../pulsar/broker/transaction/TransactionTest.java |   1 +
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  28 
 .../apache/pulsar/compaction/CompactionTest.java   | 164 -
 9 files changed, 218 insertions(+), 14 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index d1ffdf6d2d7..bc6a1e9a782 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -517,6 +517,10 @@ public interface ManagedCursor {
  */
 void rewind();
 
+default void rewind(boolean readCompacted) {
+rewind();
+}
+
 /**
  * Move the cursor to a different read position.
  *
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 10e72f709fe..7fd93dacf49 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -677,7 +677,7 @@ public class ManagedCursorImpl implements ManagedCursor {
  LedgerHandle recoveredFromCursorLedger) {
 // if the position was at a ledger that didn't exist (since it will be 
deleted if it was previously empty),
 // we need to move to the next existing ledger
-if (!ledger.ledgerExists(position.getLedgerId())) {
+if (position.getEntryId() == -1L && 
!ledger.ledgerExists(position.getLedgerId())) {
 Long nextExistingLedger = 
ledger.getNextValidLedger(position.getLedgerId());
 if (nextExistingLedger == null) {
 log.info("[{}] [{}] Couldn't find next next valid ledger for 
recovery {}", ledger.getName(), name,
@@ -2518,9 +2518,15 @@ public class ManagedCursorImpl implements ManagedCursor {
 
 @Override
 public void rewind() {
+rewind(false);
+}
+
+@Override
+public void rewind(boolean readCompacted) {
 lock.writeLock().lock();
 try {
-PositionImpl newReadPosition = 
ledger.getNextValidPosition(markDeletePosition);
+PositionImpl newReadPosition =
+readCompacted ? markDeletePosition.getNext() : 
ledger.getNextValidPosition(markDeletePosition);
 PositionImpl oldReadPosition = readPosition;
 
 log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, 
oldReadPosition, newReadPosition);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index eacc568f0a4..bf6482bda01 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -105,9 +105,9 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 if (log.isDebugEnabled()) {
 log.debug("[{}] Rewind cursor and read more entries without 
delay", name);
 }
-cursor.rewind();
-
 Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+cursor.rewind(activeConsumer != null && 
activeConsumer.readCompacted());
+
 notifyActiveConsumerChanged(activeConsumer);
 readMoreEntries(activeConsumer);
 return;
@@ -125,9 +125,9 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
  

Re: [PR] [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) [pulsar]

2024-03-06 Thread via GitHub


heesung-sn merged PR #22142:
URL: https://github.com/apache/pulsar/pull/22142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] getRedeliveryCount always return 0 [pulsar-client-node]

2024-03-06 Thread via GitHub


fabianwikstrom commented on issue #296:
URL: 
https://github.com/apache/pulsar-client-node/issues/296#issuecomment-1981825690

   Hi. I seem to have this issue as well. I have several messages in the 
backlog that I know have been processed by a consumer based on the service 
logs. When I inspect the message, there is no redeliveryCount property set 
which is causing my consumer to go into an infinite loop reprocessing. I have 
the `Shared` subscription type 
   
   @izumo27 and help appreciated :) 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker][branch-3.0] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187) [pulsar]

2024-03-06 Thread via GitHub


heesung-sn commented on PR #22142:
URL: https://github.com/apache/pulsar/pull/22142#issuecomment-1981816292

   @coderzc seems like this test keeps on failing. can you fix it?
   
   
[BrokerServiceAutoSubscriptionCreationTest.testDynamicConfigurationTopicAutoSubscriptionCreation:
 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java#L169](https://github.com/apache/pulsar/pull/22142/files#annotation_18958766392)
   java.util.concurrent.ExecutionException: 
org.apache.pulsar.client.api.PulsarClientException$SubscriptionNotFoundException:
 {"errorMsg":"Subscription does not exist","reqId":1658404878796832891, 
"remote":"localhost/127.0.0.1:44393", "local":"/127.0.0.1:57930"}


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated (68c10925df4 -> 4ff86000383)

2024-03-06 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 68c10925df4 [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)
 add 4ff86000383 [feat][misc] PIP-264: Implement topic lookup metrics using 
OpenTelemetry (#22058)

No new revisions were added by this update.

Summary of changes:
 pulsar-broker/pom.xml  |   5 +
 .../org/apache/pulsar/broker/PulsarService.java|  14 ++-
 .../pulsar/broker/namespace/NamespaceService.java  |  62 +--
 .../pulsar/broker/service/BrokerService.java   |  65 ++--
 .../broker/stats/PulsarBrokerOpenTelemetry.java|   7 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  16 ++-
 .../service/BrokerServiceThrottlingTest.java   |  56 --
 .../testcontext/AbstractTestPulsarService.java |  15 ++-
 .../testcontext/NonStartableTestPulsarService.java |   2 +-
 .../broker/testcontext/PulsarTestContext.java  |  22 +++-
 .../testcontext/StartableTestPulsarService.java|  19 ++--
 .../pulsar/client/api/BrokerServiceLookupTest.java | 117 +++--
 .../{JvmGCMetricsLogger.java => MetricsUtil.java}  |  26 ++---
 ...MetricsLoggerTest.java => MetricsUtilTest.java} |  24 ++---
 .../pulsar/opentelemetry/OpenTelemetryService.java |  10 +-
 .../annotations/PulsarDeprecatedMetric.java|  18 ++--
 .../{ => annotations}/package-info.java|   4 +-
 .../opentelemetry/OpenTelemetryServiceTest.java|  46 
 18 files changed, 405 insertions(+), 123 deletions(-)
 copy 
pulsar-common/src/main/java/org/apache/pulsar/common/stats/{JvmGCMetricsLogger.java
 => MetricsUtil.java} (61%)
 copy 
pulsar-common/src/test/java/org/apache/pulsar/common/stats/{JvmDefaultGCMetricsLoggerTest.java
 => MetricsUtilTest.java} (55%)
 copy pulsar-client/src/main/java/org/apache/pulsar/client/util/Secret.java => 
pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/annotations/PulsarDeprecatedMetric.java
 (68%)
 copy pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/{ => 
annotations}/package-info.java (87%)



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-03-06 Thread via GitHub


merlimat merged PR #22058:
URL: https://github.com/apache/pulsar/pull/22058


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP 342: Support OpenTelemetry metrics in Pulsar client [pulsar]

2024-03-06 Thread via GitHub


merlimat commented on code in PR #22178:
URL: https://github.com/apache/pulsar/pull/22178#discussion_r1515053007


##
pip/pip-342 OTel client metrics support.md:
##
@@ -0,0 +1,201 @@
+# PIP 342: Support OpenTelemetry metrics in Pulsar client
+
+## Motivation
+
+Current support for metric instrumentation in Pulsar client is very limited 
and poses a lot of
+issues for integrating the metrics into any telemetry system.
+
+We have 2 ways that metrics are exposed today:
+
+1. Printing logs every 1 minute: While this is ok as it comes out of the box, 
it's very hard for
+   any application to get the data or use it in any meaningful way.
+2. `producer.getStats()` or `consumer.getStats()`: Calling these methods will 
get access to
+   the rate of events in the last 1-minute interval. This is problematic 
because out of the
+   box the metrics are not collected anywhere. One would have to start its own 
thread to
+   periodically check these values and export them to some other system.
+
+Neither of these mechanism that we have today are sufficient to enable 
application to easily
+export the telemetry data of Pulsar client SDK.
+
+## Goal
+
+Provide a good way for applications to retrieve and analyze the usage of 
Pulsar client operation,
+in particular with respect to:
+
+1. Maximizing compatibility with existing telemetry systems
+2. Minimizing the effort required to export these metrics
+
+## Why OpenTelemetry?
+
+[OpenTelemetry](https://opentelemetry.io/) is quickly becoming the de-facto 
standard API for metric and
+tracing instrumentation. In fact, as part of 
[PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md),
+we are already migrating the Pulsar server side metrics to use OpenTelemetry.
+
+For Pulsar client SDK, we need to provide a similar way for application 
builder to quickly integrate and
+export Pulsar metrics.
+
+### Why exposing OpenTelemetry directly in Pulsar API
+
+When deciding how to expose the metrics exporter configuration there are 
multiple options: 
+
+ 1. Accept an `OpenTelemetry` object directly in Pulsar API
+ 2. Build a pluggable interface that describe all the Pulsar client SDK events 
and allow application to
+provide an implementation, perhaps providing an OpenTelemetry included 
option.
+
+For this proposal, we are following the (1) option. Here are the reasons:
+
+ 1. In a way, OpenTelemetry can be compared to 
[SLF4J](https://www.slf4j.org/), in the sense that it provides an API
+on top of which different vendor can build multiple implementations. 
Therefore, there is no need to create a new
+Pulsar-specific interface
+ 2. OpenTelemetry has 2 main artifacts: API and SDK. For the context of Pulsar 
client, we will only depend on its
+API. Applications that are going to use OpenTelemetry, will include the 
OTel SDK
+ 3. Providing a custom interface has several drawbacks:
+ 1. Applications need to update their implementations every time a new 
metric is added in Pulsar SDK
+ 2. The surface of this plugin API can become quite big when there are 
several metrics
+ 3. If we imagine an application that uses multiple libraries, like Pulsar 
SDK, and each of these has its own
+custom way to expose metrics, we can see the level of integration 
burden that is pushed to application
+developers
+ 4. It will always be easy to use OpenTelemetry to collect the metrics and 
export them using a custom metrics API. There
+are several examples of this in OpenTelemetry documentation.
+
+## Public API changes
+
+### Enabling OpenTelemetry
+
+When building a `PulsarClient` instance, it will be possible to pass an 
`OpenTelemetry` object:
+
+```java
+interface ClientBuilder {
+// ...
+ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);
+
+ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality 
metricsCardinality);
+}
+```
+
+The common usage for an application would be something like:
+
+```java
+// Creates a OpenTelemetry instance using environment variables to configure it
+OpenTelemetry otel=AutoConfiguredOpenTelemetrySdk.builder()
+.build().getOpenTelemetrySdk();
+
+PulsarClient client=PulsarClient.builder()
+.serviceUrl("pulsar://localhost:6650")
+.build();
+
+// 
+```
+
+Cardinality enum will allow to select a default cardinality label to be 
attached to the
+metrics:
+
+```java
+public enum MetricsCardinality {
+/**
+ * Do not add additional labels to metrics
+ */
+None,
+
+/**
+ * Label metrics by tenant
+ */
+Tenant,
+
+/**
+ * Label metrics by tenant and namespace
+ */
+Namespace,
+
+/**
+ * Label metrics by topic
+ */
+Topic,
+
+/**
+ * Label metrics by each partition
+ */
+Partition,
+}
+```
+
+The labels are addictive. For example, selecting `Topic` level would mean that 
the metrics will be
+labeled like:
+
+```

Re: [PR] [improve] PIP 342: Support OpenTelemetry metrics in Pulsar client [pulsar]

2024-03-06 Thread via GitHub


merlimat commented on code in PR #22178:
URL: https://github.com/apache/pulsar/pull/22178#discussion_r1515052491


##
pip/pip-342 OTel client metrics support.md:
##
@@ -0,0 +1,201 @@
+# PIP 342: Support OpenTelemetry metrics in Pulsar client
+
+## Motivation
+
+Current support for metric instrumentation in Pulsar client is very limited 
and poses a lot of
+issues for integrating the metrics into any telemetry system.
+
+We have 2 ways that metrics are exposed today:
+
+1. Printing logs every 1 minute: While this is ok as it comes out of the box, 
it's very hard for
+   any application to get the data or use it in any meaningful way.
+2. `producer.getStats()` or `consumer.getStats()`: Calling these methods will 
get access to
+   the rate of events in the last 1-minute interval. This is problematic 
because out of the
+   box the metrics are not collected anywhere. One would have to start its own 
thread to
+   periodically check these values and export them to some other system.
+
+Neither of these mechanism that we have today are sufficient to enable 
application to easily
+export the telemetry data of Pulsar client SDK.
+
+## Goal
+
+Provide a good way for applications to retrieve and analyze the usage of 
Pulsar client operation,
+in particular with respect to:
+
+1. Maximizing compatibility with existing telemetry systems
+2. Minimizing the effort required to export these metrics
+
+## Why OpenTelemetry?
+
+[OpenTelemetry](https://opentelemetry.io/) is quickly becoming the de-facto 
standard API for metric and
+tracing instrumentation. In fact, as part of 
[PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md),
+we are already migrating the Pulsar server side metrics to use OpenTelemetry.
+
+For Pulsar client SDK, we need to provide a similar way for application 
builder to quickly integrate and
+export Pulsar metrics.
+
+### Why exposing OpenTelemetry directly in Pulsar API
+
+When deciding how to expose the metrics exporter configuration there are 
multiple options: 
+
+ 1. Accept an `OpenTelemetry` object directly in Pulsar API
+ 2. Build a pluggable interface that describe all the Pulsar client SDK events 
and allow application to
+provide an implementation, perhaps providing an OpenTelemetry included 
option.
+
+For this proposal, we are following the (1) option. Here are the reasons:
+
+ 1. In a way, OpenTelemetry can be compared to 
[SLF4J](https://www.slf4j.org/), in the sense that it provides an API
+on top of which different vendor can build multiple implementations. 
Therefore, there is no need to create a new
+Pulsar-specific interface
+ 2. OpenTelemetry has 2 main artifacts: API and SDK. For the context of Pulsar 
client, we will only depend on its
+API. Applications that are going to use OpenTelemetry, will include the 
OTel SDK
+ 3. Providing a custom interface has several drawbacks:
+ 1. Applications need to update their implementations every time a new 
metric is added in Pulsar SDK
+ 2. The surface of this plugin API can become quite big when there are 
several metrics
+ 3. If we imagine an application that uses multiple libraries, like Pulsar 
SDK, and each of these has its own
+custom way to expose metrics, we can see the level of integration 
burden that is pushed to application
+developers
+ 4. It will always be easy to use OpenTelemetry to collect the metrics and 
export them using a custom metrics API. There
+are several examples of this in OpenTelemetry documentation.
+
+## Public API changes
+
+### Enabling OpenTelemetry
+
+When building a `PulsarClient` instance, it will be possible to pass an 
`OpenTelemetry` object:
+
+```java
+interface ClientBuilder {
+// ...
+ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);
+
+ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality 
metricsCardinality);

Review Comment:
   Yes, I did add that in the latest versions of the WIP PR. I'm not sure how a 
user would configure a different aggregation level of the metrics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[HOW TO FOLLOW GITHUB] Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

2024-03-06 Thread Dave Fisher
For anyone who wishes to watch activity from with a Pulsar GitHub repository 
please consider subscribing to commits@pulsar.apache.org 
 by sending an email to 
commits-subscr...@pulsar.apache.org 
 and replying to a CONFIRM email.

Regards,
Dave

> On Mar 6, 2024, at 12:50 PM, asafm (via GitHub)  wrote:
> 
> 
> asafm commented on code in PR #22179:
> URL: https://github.com/apache/pulsar/pull/22179#discussion_r1514920721
> 
> 
> ##
> pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
> ##
> @@ -451,15 +452,18 @@ ClientBuilder authentication(String 
> authPluginClassName, Map aut
> ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
> 
> /**
> - * Set the interval between each stat info (default: 60 seconds) 
> Stats will be activated with positive
> + * Set the interval between each stat info (default: disabled) 
> Stats will be activated with positive
> 
> Review Comment:
>   How do we alert the users that those metrics are still subject to changes 
> that may break them?
> 
> 
> 
> -- 
> This is an automated message from the Apache Git Service.
> To respond to the message, please log on to GitHub and use the
> URL above to go to the specific comment.
> 
> To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org
> 
> For queries about this service, please contact Infrastructure at:
> us...@infra.apache.org
> 



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-03-06 Thread via GitHub


lhotari commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1514939230


##
pulsar-broker-common/src/main/java/org/apache/pulsar/common/stats/package-info.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides metric utility classes.
+ */
+package org.apache.pulsar.common.stats;

Review Comment:
   Oh I see that no new split package anymore in this PR, problem resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-03-06 Thread via GitHub


lhotari commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1514931260


##
pulsar-broker-common/src/main/java/org/apache/pulsar/common/stats/package-info.java:
##
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides metric utility classes.
+ */
+package org.apache.pulsar.common.stats;

Review Comment:
   Couldn't we address this package split issue already in this PR before 
proceeding?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry [pulsar]

2024-03-06 Thread via GitHub


lhotari commented on code in PR #22058:
URL: https://github.com/apache/pulsar/pull/22058#discussion_r1514930165


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##
@@ -175,6 +199,12 @@ public NamespaceService(PulsarService pulsar) {
 this.bundleSplitListeners = new CopyOnWriteArrayList<>();
 this.localBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
 this.redirectManager = new RedirectManager(pulsar);
+
+this.lookupLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
+.histogramBuilder("pulsar.broker.lookup.request.duration")
+.setDescription("Lookup request duration")
+.setUnit("s")

Review Comment:
   Thank you @dragosvictor for the pointer. I also found 
https://github.com/open-telemetry/opentelemetry-specification/issues/2977 which 
was helpful as background information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

2024-03-06 Thread via GitHub


asafm commented on code in PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#discussion_r1514920721


##
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java:
##
@@ -451,15 +452,18 @@ ClientBuilder authentication(String authPluginClassName, 
Map aut
 ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
 
 /**
- * Set the interval between each stat info (default: 60 seconds) 
Stats will be activated with positive
+ * Set the interval between each stat info (default: disabled) 
Stats will be activated with positive

Review Comment:
   How do we alert the users that those metrics are still subject to changes 
that may break them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 02/06: [improve][broker] Make get list from bundle Admin API async (#20652)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3fcaa0a3044e3861b70d22141dd6dc3f6c178abd
Author: Kai Wang 
AuthorDate: Fri Jun 30 09:23:31 2023 +0800

[improve][broker] Make get list from bundle Admin API async (#20652)

(cherry picked from commit 4958f4578967a6960cd6ac4c0c0759c4bd903b94)
---
 .../broker/admin/v2/NonPersistentTopics.java   | 60 +++---
 .../pulsar/broker/web/PulsarWebResource.java   |  9 ++--
 2 files changed, 35 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 0c4ddd46c09..7cb18264ea1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -25,6 +25,7 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -51,7 +52,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -442,35 +442,35 @@ public class NonPersistentTopics extends PersistentTopics 
{
 bundleRange);
 asyncResponse.resume(Response.noContent().build());
 } else {
-NamespaceBundle nsBundle;
-try {
-nsBundle = validateNamespaceBundleOwnership(namespaceName, 
policies.bundles,
-bundleRange, true, true);
-} catch (WebApplicationException wae) {
-asyncResponse.resume(wae);
-return;
-}
-try {
-ConcurrentOpenHashMap> bundleTopics =
-
pulsar().getBrokerService().getMultiLayerTopicsMap().get(namespaceName.toString());
-if (bundleTopics == null || bundleTopics.isEmpty()) {
-asyncResponse.resume(Collections.emptyList());
-return;
-}
-final List topicList = Lists.newArrayList();
-String bundleKey = namespaceName.toString() + "/" + 
nsBundle.getBundleRange();
-ConcurrentOpenHashMap topicMap = 
bundleTopics.get(bundleKey);
-if (topicMap != null) {
-topicList.addAll(topicMap.keys().stream()
-.filter(name -> 
!TopicName.get(name).isPersistent())
-.collect(Collectors.toList()));
-}
-asyncResponse.resume(topicList);
-} catch (Exception e) {
-log.error("[{}] Failed to list topics on namespace bundle 
{}/{}", clientAppId(),
-namespaceName, bundleRange, e);
-asyncResponse.resume(new RestException(e));
-}
+validateNamespaceBundleOwnershipAsync(namespaceName, 
policies.bundles, bundleRange, true, true)
+.thenAccept(nsBundle -> {
+ConcurrentOpenHashMap> bundleTopics =
+pulsar().getBrokerService()
+
.getMultiLayerTopicsMap().get(namespaceName.toString());
+if (bundleTopics == null || 
bundleTopics.isEmpty()) {
+asyncResponse.resume(Collections.emptyList());
+return;
+}
+final List topicList = new ArrayList<>();
+String bundleKey = namespaceName.toString() + "/" 
+ nsBundle.getBundleRange();
+ConcurrentOpenHashMap topicMap = 
bundleTopics.get(bundleKey);
+if (topicMap != null) {
+topicList.addAll(topicMap.keys().stream()
+.filter(name -> 
!TopicName.get(name).isPersistent())
+.collect(Collectors.toList()));
+}
+asyncResponse.resume(topicList);
+ 

(pulsar) 06/06: [improve][broker] Consistently add fine-grain authorization to REST API (#22202)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 441fd68d8a5753682252cad29264f771e4cd1f34
Author: Qiang Zhao 
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

[improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)

(cherry picked from commit 68c10925df43769eee7265b4af0ac8ee4913e715)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 223 +--
 .../broker/admin/TopicPoliciesAuthZTest.java   | 308 +
 2 files changed, 454 insertions(+), 77 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d4690498de1..337a806d920 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
@@ -388,7 +389,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
 .thenApply(asyncResponse::resume)
 .exceptionally(ex -> {
@@ -411,7 +413,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
 @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -433,7 +436,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -457,7 +461,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
 .thenApply(asyncResponse::resume).exceptionally(ex -> {
 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -481,7 +486,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Max unacked messages on consumer policies for 
the specified topic")
 Integer maxUnackedNum) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, 

(pulsar) 05/06: [improve] [broker] Do not print an Error log when responding to `HTTP-404` when calling `Admin API` and the topic does not exist. (#21995)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ae8d14962744ccd9041a69d980a082ef2f8b4426
Author: fengyubiao 
AuthorDate: Sun Feb 18 15:46:52 2024 +0800

[improve] [broker] Do not print an Error log when responding to `HTTP-404` 
when calling `Admin API` and the topic does not exist. (#21995)

(cherry picked from commit 48b4481969cb6028186a7a84b8be8af90770674b)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  4 +
 .../broker/admin/impl/PersistentTopicsBase.java| 88 +++---
 .../broker/admin/impl/SchemasResourceBase.java |  2 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 30 
 .../pulsar/broker/admin/v3/Transactions.java   | 10 +--
 6 files changed, 71 insertions(+), 69 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 037881aac1d..5eddb4cecf2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -869,6 +869,10 @@ public abstract class AdminResource extends 
PulsarWebResource {
 == Status.NOT_FOUND.getStatusCode();
 }
 
+protected static boolean isNot307And404Exception(Throwable ex) {
+return !isRedirectException(ex) && !isNotFoundException(ex);
+}
+
 protected static String getTopicNotFoundErrorMessage(String topic) {
 return String.format("Topic %s not found", topic);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b5609892b09..3c1a43a9c69 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -796,7 +796,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we 
need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to get partitioned 
metadata while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -806,7 +806,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
-   if (!isRedirectException(ex)) {
+   if (!isNot307And404Exception(ex)) {
log.error("[{}] Failed to validate the global namespace 
ownership while unloading topic {}",
clientAppId(), topicName, ex);
}
@@ -974,7 +974,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload topic {}, {}", 
clientAppId(), topicName, ex);
 }
 resumeAsyncResponseExceptionally(asyncResponse, ex);
@@ -996,7 +996,7 @@ public class PersistentTopicsBase extends AdminResource {
 }))
 .exceptionally(ex -> {
 // If the exception is not redirect exception we need to 
log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to unload tc {},{}", 
clientAppId(),
 topicName.getPartitionIndex(), ex);
 }
@@ -1098,7 +1098,7 @@ public class PersistentTopicsBase extends AdminResource {
 }
 }).exceptionally(ex -> {
 // If the exception is not redirect exception we 
need to log it.
-if (!isRedirectException(ex)) {
+if (!isNot307And404Exception(ex)) {
 log.error("[{}] Failed to get partitioned 
topic metadata while get"
 + " subscriptions for topic {}", 
clientAppId(), topicName, ex);
 }
@@ -1108,7 +1108,7 @@ public class 

(pulsar) 04/06: [improve][admin]internalGetMessageById shouldn't be allowed on partitioned topic (#19013)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c9d2e2663003532cb6e1e1a6f53c2b3c2449a880
Author: gaozhangmin 
AuthorDate: Mon Jan 9 19:39:09 2023 +0800

[improve][admin]internalGetMessageById shouldn't be allowed on partitioned 
topic (#19013)

Co-authored-by: gavingaozhangmin 
(cherry picked from commit b05fddb1af03456438f27217dc6979be00fac19e)
---
 .../broker/admin/impl/PersistentTopicsBase.java| 97 --
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 20 +++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 20 +++--
 3 files changed, 75 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d31092c5829..b5609892b09 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2763,60 +2763,65 @@ public class PersistentTopicsBase extends AdminResource 
{
 return seekPosition;
 }
 
-protected void internalGetMessageById(AsyncResponse asyncResponse, long 
ledgerId, long entryId,
-  boolean authoritative) {
-// will redirect if the topic not owned by current broker
-validateTopicOwnershipAsync(topicName, authoritative)
-.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
-.thenCompose(__ -> {
-CompletableFuture ret;
-if (topicName.isGlobal()) {
-ret = 
validateGlobalNamespaceOwnershipAsync(namespaceName);
-} else {
-ret = CompletableFuture.completedFuture(null);
-}
-return ret;
-})
-.thenCompose(__ -> getTopicReferenceAsync(topicName))
-.thenAccept(topic -> {
-ManagedLedgerImpl ledger =
-(ManagedLedgerImpl) ((PersistentTopic) 
topic).getManagedLedger();
-ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId),
-new AsyncCallbacks.ReadEntryCallback() {
-@Override
-public void 
readEntryFailed(ManagedLedgerException exception,
-Object ctx) {
-asyncResponse.resume(new 
RestException(exception));
-}
+protected CompletableFuture internalGetMessageById(long 
ledgerId, long entryId, boolean authoritative) {
+CompletableFuture future;
+if (topicName.isGlobal()) {
+future = validateGlobalNamespaceOwnershipAsync(namespaceName);
+} else {
+future = CompletableFuture.completedFuture(null);
+}
+return future.thenCompose(__ -> {
+if (topicName.isPartitioned()) {
+return CompletableFuture.completedFuture(null);
+} else {
+return getPartitionedTopicMetadataAsync(topicName, 
authoritative, false)
+.thenAccept(topicMetadata -> {
+if (topicMetadata.partitions > 0) {
+log.warn("[{}] Not supported getMessageById 
operation on partitioned-topic {}",
+clientAppId(), topicName);
+throw new 
RestException(Status.METHOD_NOT_ALLOWED,
+"GetMessageById is not allowed on 
partitioned-topic");
+}
+});
 
-@Override
-public void readEntryComplete(Entry entry, 
Object ctx) {
-try {
-
asyncResponse.resume(generateResponseWithEntry(entry));
-} catch (IOException exception) {
-asyncResponse.resume(new 
RestException(exception));
-} finally {
-if (entry != null) {
-entry.release();
-}
-}
+}
+})
+.thenCompose(ignore -> validateTopicOwnershipAsync(topicName, 
authoritative))
+.thenCompose(__ -> validateTopicOperationAsync(topicName, 
TopicOperation.PEEK_MESSAGES))
+.thenCompose(__ -> 

(pulsar) 01/06: [refactor][broker] Suppress error logging when message expiration fails (#19778)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c00c3c1e2a76abcd243486b5ef2c14a27ceef5ef
Author: Masahiro Sakamoto 
AuthorDate: Wed Apr 5 15:45:59 2023 +0900

[refactor][broker] Suppress error logging when message expiration fails 
(#19778)

(cherry picked from commit 0d1fe1821e030c0c0b53a7d47a2bf89151783eb4)
---
 .../broker/admin/impl/PersistentTopicsBase.java| 85 +++---
 1 file changed, 59 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 264bebfe50e..d31092c5829 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2006,11 +2006,15 @@ public class PersistentTopicsBase extends AdminResource 
{
 
 FutureUtil.waitForAll(futures).handle((result, 
exception) -> {
 if (exception != null) {
-Throwable t = exception.getCause();
-log.error("[{}] Failed to expire messages 
up to {} on {}",
-clientAppId(), expireTimeInSeconds,
-topicName, t);
-asyncResponse.resume(new RestException(t));
+Throwable t = 
FutureUtil.unwrapCompletionException(exception);
+if (t instanceof PulsarAdminException) {
+log.warn("[{}] Failed to expire 
messages up to {} on {}: {}", clientAppId(),
+expireTimeInSeconds, 
topicName, t.toString());
+} else {
+log.error("[{}] Failed to expire 
messages up to {} on {}", clientAppId(),
+expireTimeInSeconds, 
topicName, t);
+}
+
resumeAsyncResponseExceptionally(asyncResponse, t);
 return null;
 }
 
asyncResponse.resume(Response.noContent().build());
@@ -2077,9 +2081,14 @@ public class PersistentTopicsBase extends AdminResource {
 FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
 if (exception != null) {
 Throwable throwable = 
FutureUtil.unwrapCompletionException(exception);
-log.error("[{}] Failed to expire messages for all 
subscription up to {} on {}",
-clientAppId(), expireTimeInSeconds, 
topicName, throwable);
-asyncResponse.resume(new RestException(throwable));
+if (throwable instanceof RestException) {
+log.warn("[{}] Failed to expire messages for 
all subscription up to {} on {}: {}",
+clientAppId(), expireTimeInSeconds, 
topicName, throwable.toString());
+} else {
+log.error("[{}] Failed to expire messages for 
all subscription up to {} on {}",
+clientAppId(), expireTimeInSeconds, 
topicName, throwable);
+}
+resumeAsyncResponseExceptionally(asyncResponse, 
throwable);
 return null;
 }
 asyncResponse.resume(Response.noContent().build());
@@ -3846,17 +3855,24 @@ public class PersistentTopicsBase extends AdminResource 
{
 
 
FutureUtil.waitForAll(futures).handle((result, exception) -> {
 if (exception != null) {
-Throwable t = 
exception.getCause();
+Throwable t = 
FutureUtil.unwrapCompletionException(exception);
 if (t instanceof 
NotFoundException) {
 
asyncResponse.resume(new RestException(Status.NOT_FOUND,
 
getSubNotFoundErrorMessage(topicName.toString(),
 
subName)));
 

(pulsar) 03/06: [improve][broker] Avoid print redirect exception log when get list from bundle (#20846)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e530c8bf1f14564b00a78e9456ba07ca563a3adb
Author: Kai Wang 
AuthorDate: Sun Jul 23 12:56:04 2023 +0800

[improve][broker] Avoid print redirect exception log when get list from 
bundle (#20846)

(cherry picked from commit 9256407cdca1ab6d9b3a59ce404dccb09953ab24)
---
 .../pulsar/broker/admin/v2/NonPersistentTopics.java | 21 -
 1 file changed, 8 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 7cb18264ea1..80d6a9f62da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -461,25 +461,20 @@ public class NonPersistentTopics extends PersistentTopics 
{
 }
 asyncResponse.resume(topicList);
 }).exceptionally(ex -> {
-Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
-log.error("[{}] Failed to list topics on namespace 
bundle {}/{}", clientAppId(),
-namespaceName, bundleRange, realCause);
-if (realCause instanceof WebApplicationException) {
-asyncResponse.resume(realCause);
-} else {
-asyncResponse.resume(new 
RestException(realCause));
+if (!isRedirectException(ex)) {
+log.error("[{}] Failed to list topics on 
namespace bundle {}/{}", clientAppId(),
+namespaceName, bundleRange, ex);
 }
+resumeAsyncResponseExceptionally(asyncResponse, 
ex);
 return null;
 });
 }
 }).exceptionally(ex -> {
-log.error("[{}] Failed to list topics on namespace bundle {}/{}", 
clientAppId(),
-namespaceName, bundleRange, ex);
-if (ex.getCause() instanceof WebApplicationException) {
-asyncResponse.resume(ex.getCause());
-} else {
-asyncResponse.resume(new RestException(ex.getCause()));
+if (!isRedirectException(ex)) {
+log.error("[{}] Failed to list topics on namespace bundle 
{}/{}", clientAppId(),
+namespaceName, bundleRange, ex);
 }
+resumeAsyncResponseExceptionally(asyncResponse, ex);
 return null;
 });
 }



(pulsar) branch branch-2.11 updated (0cf5a837512 -> 441fd68d8a5)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 0cf5a837512 [fix][test] testModularLoadManagerRemoveBundleAndLoad 
(#19710)
 new c00c3c1e2a7 [refactor][broker] Suppress error logging when message 
expiration fails (#19778)
 new 3fcaa0a3044 [improve][broker] Make get list from bundle Admin API 
async (#20652)
 new e530c8bf1f1 [improve][broker] Avoid print redirect exception log when 
get list from bundle (#20846)
 new c9d2e266300 [improve][admin]internalGetMessageById shouldn't be 
allowed on partitioned topic (#19013)
 new ae8d1496274 [improve] [broker] Do not print an Error log when 
responding to `HTTP-404` when calling `Admin API` and the topic does not exist. 
(#21995)
 new 441fd68d8a5 [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |   4 +
 .../broker/admin/impl/PersistentTopicsBase.java| 268 ++
 .../broker/admin/impl/SchemasResourceBase.java |   2 +-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  20 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  69 +++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 269 +++---
 .../pulsar/broker/admin/v3/Transactions.java   |  10 +-
 .../pulsar/broker/web/PulsarWebResource.java   |   9 +-
 .../broker/admin/TopicPoliciesAuthZTest.java   | 308 +
 9 files changed, 690 insertions(+), 269 deletions(-)



Re: [PR] [improve] PIP 342: Support OpenTelemetry metrics in Pulsar client [pulsar]

2024-03-06 Thread via GitHub


asafm commented on code in PR #22178:
URL: https://github.com/apache/pulsar/pull/22178#discussion_r1514915145


##
pip/pip-342 OTel client metrics support.md:
##
@@ -0,0 +1,201 @@
+# PIP 342: Support OpenTelemetry metrics in Pulsar client
+
+## Motivation
+
+Current support for metric instrumentation in Pulsar client is very limited 
and poses a lot of
+issues for integrating the metrics into any telemetry system.
+
+We have 2 ways that metrics are exposed today:
+
+1. Printing logs every 1 minute: While this is ok as it comes out of the box, 
it's very hard for
+   any application to get the data or use it in any meaningful way.
+2. `producer.getStats()` or `consumer.getStats()`: Calling these methods will 
get access to
+   the rate of events in the last 1-minute interval. This is problematic 
because out of the
+   box the metrics are not collected anywhere. One would have to start its own 
thread to
+   periodically check these values and export them to some other system.
+
+Neither of these mechanism that we have today are sufficient to enable 
application to easily
+export the telemetry data of Pulsar client SDK.
+
+## Goal
+
+Provide a good way for applications to retrieve and analyze the usage of 
Pulsar client operation,
+in particular with respect to:
+
+1. Maximizing compatibility with existing telemetry systems
+2. Minimizing the effort required to export these metrics
+
+## Why OpenTelemetry?
+
+[OpenTelemetry](https://opentelemetry.io/) is quickly becoming the de-facto 
standard API for metric and
+tracing instrumentation. In fact, as part of 
[PIP-264](https://github.com/apache/pulsar/blob/master/pip/pip-264.md),
+we are already migrating the Pulsar server side metrics to use OpenTelemetry.
+
+For Pulsar client SDK, we need to provide a similar way for application 
builder to quickly integrate and
+export Pulsar metrics.
+
+### Why exposing OpenTelemetry directly in Pulsar API
+
+When deciding how to expose the metrics exporter configuration there are 
multiple options: 
+
+ 1. Accept an `OpenTelemetry` object directly in Pulsar API
+ 2. Build a pluggable interface that describe all the Pulsar client SDK events 
and allow application to
+provide an implementation, perhaps providing an OpenTelemetry included 
option.
+
+For this proposal, we are following the (1) option. Here are the reasons:
+
+ 1. In a way, OpenTelemetry can be compared to 
[SLF4J](https://www.slf4j.org/), in the sense that it provides an API
+on top of which different vendor can build multiple implementations. 
Therefore, there is no need to create a new
+Pulsar-specific interface
+ 2. OpenTelemetry has 2 main artifacts: API and SDK. For the context of Pulsar 
client, we will only depend on its
+API. Applications that are going to use OpenTelemetry, will include the 
OTel SDK
+ 3. Providing a custom interface has several drawbacks:
+ 1. Applications need to update their implementations every time a new 
metric is added in Pulsar SDK
+ 2. The surface of this plugin API can become quite big when there are 
several metrics
+ 3. If we imagine an application that uses multiple libraries, like Pulsar 
SDK, and each of these has its own
+custom way to expose metrics, we can see the level of integration 
burden that is pushed to application
+developers
+ 4. It will always be easy to use OpenTelemetry to collect the metrics and 
export them using a custom metrics API. There
+are several examples of this in OpenTelemetry documentation.
+
+## Public API changes
+
+### Enabling OpenTelemetry
+
+When building a `PulsarClient` instance, it will be possible to pass an 
`OpenTelemetry` object:
+
+```java
+interface ClientBuilder {
+// ...
+ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry 
openTelemetry);
+
+ClientBuilder openTelemetryMetricsCardinality(MetricsCardinality 
metricsCardinality);

Review Comment:
   They have an experimental interface called `ExtendedDoubleCounterBuilder` 
(the same name for any other type).
   You use the `Meter` to obtain a builder, then cast the builder to 
`ExtendedDoubleCounterBuilder` and continue to set your name, unit, advice, and 
build.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][monitor] PIP-223: Add metrics for all rest endpoints. [pulsar]

2024-03-06 Thread via GitHub


asafm commented on code in PR #21772:
URL: https://github.com/apache/pulsar/pull/21772#discussion_r1514879590


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java:
##
@@ -751,6 +751,7 @@ public void start() throws PulsarServerException {
 config.getDefaultRetentionTimeInMinutes() * 60));
 }
 
+this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

Review Comment:
   This is about to change at https://github.com/apache/pulsar/pull/22058



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java:
##
@@ -21,43 +21,68 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import io.prometheus.client.Counter;
-import io.prometheus.client.Histogram;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
 import java.util.Stack;
+import javax.validation.constraints.NotNull;
 import javax.ws.rs.container.ContainerRequestContext;
 import javax.ws.rs.container.ContainerRequestFilter;
 import javax.ws.rs.container.ContainerResponseContext;
 import javax.ws.rs.container.ContainerResponseFilter;
 import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.glassfish.jersey.server.internal.routing.UriRoutingContext;
 import org.glassfish.jersey.server.model.Resource;
 import org.glassfish.jersey.server.model.ResourceMethod;
-import org.jetbrains.annotations.NotNull;
 
 public class RestEndpointMetricsFilter implements ContainerResponseFilter, 
ContainerRequestFilter {
-private static final LoadingCache CACHE = 
CacheBuilder
+private final LoadingCache CACHE = CacheBuilder
 .newBuilder()
 .maximumSize(100)
+.expireAfterAccess(Duration.ofMinutes(1))
 .build(new CacheLoader<>() {
 @Override
 public @NotNull String load(@NotNull ResourceMethod method) 
throws Exception {
 return getRestPath(method);
 }
 });
 
-private static final Histogram LATENCY = Histogram
-.build("pulsar_broker_rest_endpoint_latency", "-")
-.unit("ms")
-.labelNames("path", "method")
-.buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)
-.register();
-private static final Counter FAILED = Counter
-.build("pulsar_broker_rest_endpoint_failed", "-")
-.labelNames("path", "method", "code")
-.register();
-
 private static final String REQUEST_START_TIME = "requestStartTime";
+private static final AttributeKey PATH = 
AttributeKey.stringKey("path");

Review Comment:
   See OTel HTTP semantic conventions for HTTP related attribute names.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java:
##
@@ -21,43 +21,68 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import io.prometheus.client.Counter;
-import io.prometheus.client.Histogram;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
 import java.util.Stack;
+import javax.validation.constraints.NotNull;
 import javax.ws.rs.container.ContainerRequestContext;
 import javax.ws.rs.container.ContainerRequestFilter;
 import javax.ws.rs.container.ContainerResponseContext;
 import javax.ws.rs.container.ContainerResponseFilter;
 import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.glassfish.jersey.server.internal.routing.UriRoutingContext;
 import org.glassfish.jersey.server.model.Resource;
 import org.glassfish.jersey.server.model.ResourceMethod;
-import org.jetbrains.annotations.NotNull;
 
 public class RestEndpointMetricsFilter implements ContainerResponseFilter, 
ContainerRequestFilter {
-private static final LoadingCache CACHE = 
CacheBuilder
+private final LoadingCache CACHE = CacheBuilder
 .newBuilder()
 .maximumSize(100)
+.expireAfterAccess(Duration.ofMinutes(1))
 .build(new CacheLoader<>() {
 @Override
 public @NotNull String load(@NotNull ResourceMethod method) 

Re: [I] Pulsar Authentication should support rotation of Validation keys e.g. Public keys used in JWT validation [pulsar]

2024-03-06 Thread via GitHub


damienburke commented on issue #8152:
URL: https://github.com/apache/pulsar/issues/8152#issuecomment-1981417316

   Thanks @nodece - could u re-open? I dont have the rights. 
   Also, let me know if u want me to take over PR, in which case please assign 
to me, cheers 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Problem in developing a pulsar-io-mqtt [pulsar]

2024-03-06 Thread via GitHub


caihualin commented on issue #15154:
URL: https://github.com/apache/pulsar/issues/15154#issuecomment-1981296625

   pulsar-io-mqtt source code where can be found


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] WIP: PIP-342: OTel client metrics support [pulsar]

2024-03-06 Thread via GitHub


merlimat commented on PR #22179:
URL: https://github.com/apache/pulsar/pull/22179#issuecomment-1981210195

   @asafm I applied most of the suggestion, please give it another pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Error from server (BadRequest): container "pulsar-broker" in pod "pulsar-broker-0" is waiting to start: PodInitializing [pulsar]

2024-03-06 Thread via GitHub


GitHub user asafm added a comment to the discussion: Error from server 
(BadRequest): container "pulsar-broker" in pod "pulsar-broker-0" is waiting to 
start: PodInitializing

@hangc0276 Maybe you know?

GitHub link: 
https://github.com/apache/pulsar/discussions/17802#discussioncomment-8695826


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [I] [Doc] deduplication doesn't mention the requirement of producer name [pulsar]

2024-03-06 Thread via GitHub


visortelle closed issue #22210: [Doc] deduplication doesn't mention the 
requirement of producer name
URL: https://github.com/apache/pulsar/issues/22210


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][doc] Dedup section needs to remark that producer name is a requirement [pulsar-site]

2024-03-06 Thread via GitHub


visortelle merged PR #827:
URL: https://github.com/apache/pulsar-site/pull/827


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch main updated: [improve][doc] Dedup section needs to remark that producer name is a requirement (#827)

2024-03-06 Thread visortelle
This is an automated email from the ASF dual-hosted git repository.

visortelle pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new 8dc56d96a4e5 [improve][doc] Dedup section needs to remark that 
producer name is a requirement (#827)
8dc56d96a4e5 is described below

commit 8dc56d96a4e5b3453bb9e80372a0fee4c42c75ec
Author: Alvaro <102966649+alvarostr...@users.noreply.github.com>
AuthorDate: Wed Mar 6 16:07:59 2024 +0100

[improve][doc] Dedup section needs to remark that producer name is a 
requirement (#827)

* Dedup mark producer name as a requirement

The explanation doesn't remark this is a requirement.

* Change in all the latest versioned_docs

remark the requirement with the producer's name
---
 docs/cookbooks-deduplication.md  | 2 +-
 versioned_docs/version-2.11.x/cookbooks-deduplication.md | 2 +-
 versioned_docs/version-3.0.x/cookbooks-deduplication.md  | 2 +-
 versioned_docs/version-3.1.x/cookbooks-deduplication.md  | 2 +-
 versioned_docs/version-3.2.x/cookbooks-deduplication.md  | 2 +-
 5 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/docs/cookbooks-deduplication.md b/docs/cookbooks-deduplication.md
index b7b8b39381cb..8da7961bc59e 100644
--- a/docs/cookbooks-deduplication.md
+++ b/docs/cookbooks-deduplication.md
@@ -73,7 +73,7 @@ If you enable message deduplication in Pulsar brokers, 
namespaces, or topics, it
 
 So you need to complete the following tasks for your client producers:
 
-1. Specify a name for the producer.
+1. Specify a name for the producer (this is a requirement, Pulsar will use the 
producer name to filter duplicated messages).
 1. Set the message timeout to `0` (namely, no timeout).
 
 The instructions for Java, Python, and C++ clients are different.
diff --git a/versioned_docs/version-2.11.x/cookbooks-deduplication.md 
b/versioned_docs/version-2.11.x/cookbooks-deduplication.md
index 83df8b04bc1c..21a711ddabb5 100644
--- a/versioned_docs/version-2.11.x/cookbooks-deduplication.md
+++ b/versioned_docs/version-2.11.x/cookbooks-deduplication.md
@@ -73,7 +73,7 @@ If you enable message deduplication in Pulsar brokers, 
namespaces, or topics, it
 
 So you need to complete the following tasks for your client producers:
 
-1. Specify a name for the producer.
+1. Specify a name for the producer (this is a requirement, Pulsar will use the 
producer name to filter duplicated messages).
 1. Set the message timeout to `0` (namely, no timeout).
 
 The instructions for Java, Python, and C++ clients are different.
diff --git a/versioned_docs/version-3.0.x/cookbooks-deduplication.md 
b/versioned_docs/version-3.0.x/cookbooks-deduplication.md
index 63c8afd8201c..301aeb96eb67 100644
--- a/versioned_docs/version-3.0.x/cookbooks-deduplication.md
+++ b/versioned_docs/version-3.0.x/cookbooks-deduplication.md
@@ -73,7 +73,7 @@ If you enable message deduplication in Pulsar brokers, 
namespaces, or topics, it
 
 So you need to complete the following tasks for your client producers:
 
-1. Specify a name for the producer.
+1. Specify a name for the producer (this is a requirement, Pulsar will use the 
producer name to filter duplicated messages).
 1. Set the message timeout to `0` (namely, no timeout).
 
 The instructions for Java, Python, and C++ clients are different.
diff --git a/versioned_docs/version-3.1.x/cookbooks-deduplication.md 
b/versioned_docs/version-3.1.x/cookbooks-deduplication.md
index 63c8afd8201c..301aeb96eb67 100644
--- a/versioned_docs/version-3.1.x/cookbooks-deduplication.md
+++ b/versioned_docs/version-3.1.x/cookbooks-deduplication.md
@@ -73,7 +73,7 @@ If you enable message deduplication in Pulsar brokers, 
namespaces, or topics, it
 
 So you need to complete the following tasks for your client producers:
 
-1. Specify a name for the producer.
+1. Specify a name for the producer (this is a requirement, Pulsar will use the 
producer name to filter duplicated messages).
 1. Set the message timeout to `0` (namely, no timeout).
 
 The instructions for Java, Python, and C++ clients are different.
diff --git a/versioned_docs/version-3.2.x/cookbooks-deduplication.md 
b/versioned_docs/version-3.2.x/cookbooks-deduplication.md
index b7b8b39381cb..8da7961bc59e 100644
--- a/versioned_docs/version-3.2.x/cookbooks-deduplication.md
+++ b/versioned_docs/version-3.2.x/cookbooks-deduplication.md
@@ -73,7 +73,7 @@ If you enable message deduplication in Pulsar brokers, 
namespaces, or topics, it
 
 So you need to complete the following tasks for your client producers:
 
-1. Specify a name for the producer.
+1. Specify a name for the producer (this is a requirement, Pulsar will use the 
producer name to filter duplicated messages).
 1. Set the message timeout to `0` (namely, no timeout).
 
 The instructions for Java, Python, and C++ clients are different.



Re: [I] Pulsar Authentication should support rotation of Validation keys e.g. Public keys used in JWT validation [pulsar]

2024-03-06 Thread via GitHub


nodece commented on issue #8152:
URL: https://github.com/apache/pulsar/issues/8152#issuecomment-1981054158

   Hi @damienburke, #18336 can be reopen, this feature is important in some 
cases without OIDC.
   
   /cc @lhotari @michaeljmarshall @eolivelli  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][doc] Dedup section needs to remark that producer name is a requirement [pulsar-site]

2024-03-06 Thread via GitHub


visortelle commented on PR #827:
URL: https://github.com/apache/pulsar-site/pull/827#issuecomment-1981030660

   @AlvaroStream yes. Thank you :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Pulsar Authentication should support rotation of Validation keys e.g. Public keys used in JWT validation [pulsar]

2024-03-06 Thread via GitHub


damienburke commented on issue #8152:
URL: https://github.com/apache/pulsar/issues/8152#issuecomment-1981003403

   Hey @nodece , @lhotari , @frankjkelly - this is something that would be 
really useful for my org. We are a heavy pulsar user and our current auth is 
with JWTs. A lot of related work seems to have already been done by [this 
PR](https://github.com/apache/pulsar/pull/18336) - but it was closed, citing 
usage of Open ID connect as a solution. We dont have an IDP, so using the Open 
ID connect auth pulsar is not really an option for us.
   
   Ideally the linked PR could be re-opened? I would have the cycles to take 
over the PR - handle any open / additional PR feedback, and write ITs, update 
doc, etc. Possibly also extend functionality, so any updates to the JWKS file 
are loaded by Pulsar w/o a restart and any other functionality that makes 
sense.  
   
   Please let me know your thoughts. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][doc] Dedup section needs to remark that producer name is a requirement [pulsar-site]

2024-03-06 Thread via GitHub


AlvaroStream commented on PR #827:
URL: https://github.com/apache/pulsar-site/pull/827#issuecomment-1980999291

   Is enough with [this 
](https://github.com/apache/pulsar-site/pull/827/commits/8866d09dfb35911b46039cffd3daaf302a46d6eb)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][broker] Consistently add fine-grain authorization to REST API (#22202)

2024-03-06 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 68c10925df4 [improve][broker] Consistently add fine-grain 
authorization to REST API (#22202)
68c10925df4 is described below

commit 68c10925df43769eee7265b4af0ac8ee4913e715
Author: Qiang Zhao 
AuthorDate: Wed Mar 6 22:00:45 2024 +0800

[improve][broker] Consistently add fine-grain authorization to REST API 
(#22202)
---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 244 ++--
 .../broker/admin/TopicPoliciesAuthZTest.java   | 312 -
 2 files changed, 471 insertions(+), 85 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index b1a7190b823..d2cbaa5428a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import 
org.apache.pulsar.common.policies.data.impl.AutoSubscriptionCreationOverrideImpl;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
@@ -358,7 +359,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
 .thenApply(asyncResponse::resume)
 .exceptionally(ex -> {
@@ -381,7 +383,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
 @ApiParam(value = "Offload policies for the specified topic") 
OffloadPoliciesImpl offloadPolicies) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenAccept(__ -> validateOffloadPolicies(offloadPolicies))
 .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, 
isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
@@ -404,7 +407,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.OFFLOAD, 
PolicyOperation.WRITE)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
 .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
 .exceptionally(ex -> {
@@ -428,7 +432,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Whether leader broker redirected this call to 
this broker. For internal use.")
 @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
 validateTopicName(tenant, namespace, encodedTopic);
-preValidation(authoritative)
+validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.READ)
+.thenCompose(__ -> preValidation(authoritative))
 .thenCompose(__ -> 
internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
 .thenApply(asyncResponse::resume).exceptionally(ex -> {
 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", 
ex, asyncResponse);
@@ -452,7 +457,8 @@ public class PersistentTopics extends PersistentTopicsBase {
 @ApiParam(value = "Max unacked messages on consumer policies for 
the specified topic")
 Integer 

  1   2   >