Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]

2024-04-24 Thread via GitHub


thetumbled commented on code in PR #22564:
URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578960282


##
pip/pip-347.md:
##
@@ -0,0 +1,68 @@
+
+# PIP-347: add role field in consumer's stat
+
+# Background knowledge
+
+- In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
+- When users want to create a subscription in the management platform, we will 
grant the `consume` permission of topic and the permission of
+that subscription to the user's token. so that user can create a subscription 
successfully.
+- We also set config `allowAutoSubscriptionCreation` to be true to allow user 
creating subscriptions automatically if user's token has the `consume` 
permission.
+But, **auto-creation of subscriptions will not grant the permission of the 
subscription to the user's token**.
+- The permission control flow of subscription: 
+  - we can only grant and revoke the permission of a subscription by the 
`grant-permission` and `revoke-permission` REST API.
+  - The permission of subscription is in the namespace level. When a user 
creates a subscription `sub` for topic `persistent://public/default/test`,
+  broker will check if user's token has the `consume` permission of 
`persistent://public/default/test`, if yes, broker will retrieve the permission 
of all subscriptions under the namespace `public/default` and get the role list 
of subscription `sub`.
+- **If the role list is empty or null, broker allow to create the 
subscription.** However, the permission of the subscription is still not 
granted to the user's token.
+- If the role list is not empty, broker will check if the role 
corresponding to the user's token is in the role list. If yes, broker allow to 
create the subscription.
+- If the role corresponding to the user's token is not in the role list, 
broker will reject the request.
+
+There is a permission hole in the current design. If a user obtain the 
`consume` permission of a topic in management platform, he can create infinite 
subscriptions without notifying the administrator.
+For example, user `jack` can obtain the `consume` permission of topic 
`persistent://public/default/test` and the permission of subscription `sub` in 
management platform.
+At this time, the role list under the namespace `public/default` is 
+```
+'sub' -> ['jack']
+```
+Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic 
`persistent://public/default/test` without notifying the administrator. The 
role list under the namespace `public/default` remain unchanged as
+```
+'sub' -> ['jack']
+```
+The administrator can't know who is the owner of the subscriptions `sub1`, 
`sub2`,...
+
+What is worse, if another user `tom` obtain the permission of subscription 
`sub1` under the same namespace `public/default` in management platform, that 
result into the role list under the namespace `public/default` become to 
+``` 
+'sub' -> ['jack']
+'sub1' -> ['tom']
+```
+`jack` lose the permission of subscription `sub1` immediately. Because jack do 
not have the permission of subscription `sub1` really.
+
+He just uses an unregistered subscription `sub1` owned by nobody to consume 
the messages. Once there are any people obtain the permission of subscription 
`sub1`, his task consuming `sub1` will be shutdown immediately.
+
+# Motivation
+
+Such kind of permission lost problem cause us lots of problem. We need a tool 
to find out the owner of these unregistered subscriptions first.
+
+# Goals
+Add a field `role` in the consumer's stat to show the owner of this consumer.
+
+## In Scope
+
+- help administrator to find out the owner of the consumer
+
+## Out of Scope
+
+Maybe we improve the permission control of subscriptions in the future to fix 
the permission lost problem described above.
+
+
+# Backward & Forward Compatibility
+
+Fully compatible.

Review Comment:
   https://github.com/apache/pulsar/pull/22579 is a break change, existing 
tasks may shutdown due to this pr. So we don't enhance the permission control 
with this pr now, but try to find out all these uncontrolled subscription 
first, and grant the permission to them at least.
   I agree with your opinion that we should handle the existing clusters 
carefully. I hope that we can have a more friendly solution instead of this 
break change. https://github.com/apache/pulsar/pull/22579 is a candidate 
solutions.
   Finally, i think that the community should fix this problem in case of more 
and more users facing such kind of problem. 



##
pip/pip-347.md:
##
@@ -0,0 +1,68 @@
+
+# PIP-347: add role field in consumer's stat
+
+# Background knowledge
+
+- In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
+- When users want to create a subscription in the management platform, we will 
grant the `consume` permiss

Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]

2024-04-24 Thread via GitHub


poorbarcode commented on PR #22577:
URL: https://github.com/apache/pulsar/pull/22577#issuecomment-2076500256

   Rebase master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578970618


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
 return ledger.getLastConfirmedEntry();
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {

Review Comment:
   For TransactionBuffer, it just handle the TXN commit/abort/append requests 
and provide the guarantee of READ_COMMITTED.
   Determine the position to dispatch is Topic's duty, TransactionBuffer should 
not be aware of it.
   See also 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4122-L4124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578970618


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
 return ledger.getLastConfirmedEntry();
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {

Review Comment:
   For TransactionBuffer, it just handle the TXN commit/abort/append requests 
and provide the guarantee of READ_COMMIT.
   Determine the position to dispatch is Topic's duty, TransactionBuffer should 
not be aware of it.
   See also 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4122-L4124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]

2024-04-24 Thread via GitHub


thetumbled commented on code in PR #22564:
URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578960282


##
pip/pip-347.md:
##
@@ -0,0 +1,68 @@
+
+# PIP-347: add role field in consumer's stat
+
+# Background knowledge
+
+- In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
+- When users want to create a subscription in the management platform, we will 
grant the `consume` permission of topic and the permission of
+that subscription to the user's token. so that user can create a subscription 
successfully.
+- We also set config `allowAutoSubscriptionCreation` to be true to allow user 
creating subscriptions automatically if user's token has the `consume` 
permission.
+But, **auto-creation of subscriptions will not grant the permission of the 
subscription to the user's token**.
+- The permission control flow of subscription: 
+  - we can only grant and revoke the permission of a subscription by the 
`grant-permission` and `revoke-permission` REST API.
+  - The permission of subscription is in the namespace level. When a user 
creates a subscription `sub` for topic `persistent://public/default/test`,
+  broker will check if user's token has the `consume` permission of 
`persistent://public/default/test`, if yes, broker will retrieve the permission 
of all subscriptions under the namespace `public/default` and get the role list 
of subscription `sub`.
+- **If the role list is empty or null, broker allow to create the 
subscription.** However, the permission of the subscription is still not 
granted to the user's token.
+- If the role list is not empty, broker will check if the role 
corresponding to the user's token is in the role list. If yes, broker allow to 
create the subscription.
+- If the role corresponding to the user's token is not in the role list, 
broker will reject the request.
+
+There is a permission hole in the current design. If a user obtain the 
`consume` permission of a topic in management platform, he can create infinite 
subscriptions without notifying the administrator.
+For example, user `jack` can obtain the `consume` permission of topic 
`persistent://public/default/test` and the permission of subscription `sub` in 
management platform.
+At this time, the role list under the namespace `public/default` is 
+```
+'sub' -> ['jack']
+```
+Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic 
`persistent://public/default/test` without notifying the administrator. The 
role list under the namespace `public/default` remain unchanged as
+```
+'sub' -> ['jack']
+```
+The administrator can't know who is the owner of the subscriptions `sub1`, 
`sub2`,...
+
+What is worse, if another user `tom` obtain the permission of subscription 
`sub1` under the same namespace `public/default` in management platform, that 
result into the role list under the namespace `public/default` become to 
+``` 
+'sub' -> ['jack']
+'sub1' -> ['tom']
+```
+`jack` lose the permission of subscription `sub1` immediately. Because jack do 
not have the permission of subscription `sub1` really.
+
+He just uses an unregistered subscription `sub1` owned by nobody to consume 
the messages. Once there are any people obtain the permission of subscription 
`sub1`, his task consuming `sub1` will be shutdown immediately.
+
+# Motivation
+
+Such kind of permission lost problem cause us lots of problem. We need a tool 
to find out the owner of these unregistered subscriptions first.
+
+# Goals
+Add a field `role` in the consumer's stat to show the owner of this consumer.
+
+## In Scope
+
+- help administrator to find out the owner of the consumer
+
+## Out of Scope
+
+Maybe we improve the permission control of subscriptions in the future to fix 
the permission lost problem described above.
+
+
+# Backward & Forward Compatibility
+
+Fully compatible.

Review Comment:
   PR https://github.com/apache/pulsar/pull/22579 is a break change, existing 
tasks may shutdown due to this pr. So we don't enhance the permission control 
with this pr now, but try to find out all these uncontrolled subscription 
first, and grant the permission to them at least.
   In my opinion, the community should fix this problem in case of more and 
more users facing such kind of problem. But for existing clusters, we should 
handle them carefully.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]

2024-04-24 Thread via GitHub


thetumbled commented on code in PR #22564:
URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578947195


##
pip/pip-347.md:
##
@@ -0,0 +1,68 @@
+
+# PIP-347: add role field in consumer's stat
+
+# Background knowledge
+
+- In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
+- When users want to create a subscription in the management platform, we will 
grant the `consume` permission of topic and the permission of
+that subscription to the user's token. so that user can create a subscription 
successfully.
+- We also set config `allowAutoSubscriptionCreation` to be true to allow user 
creating subscriptions automatically if user's token has the `consume` 
permission.
+But, **auto-creation of subscriptions will not grant the permission of the 
subscription to the user's token**.
+- The permission control flow of subscription: 
+  - we can only grant and revoke the permission of a subscription by the 
`grant-permission` and `revoke-permission` REST API.
+  - The permission of subscription is in the namespace level. When a user 
creates a subscription `sub` for topic `persistent://public/default/test`,
+  broker will check if user's token has the `consume` permission of 
`persistent://public/default/test`, if yes, broker will retrieve the permission 
of all subscriptions under the namespace `public/default` and get the role list 
of subscription `sub`.
+- **If the role list is empty or null, broker allow to create the 
subscription.** However, the permission of the subscription is still not 
granted to the user's token.
+- If the role list is not empty, broker will check if the role 
corresponding to the user's token is in the role list. If yes, broker allow to 
create the subscription.
+- If the role corresponding to the user's token is not in the role list, 
broker will reject the request.
+
+There is a permission hole in the current design. If a user obtain the 
`consume` permission of a topic in management platform, he can create infinite 
subscriptions without notifying the administrator.
+For example, user `jack` can obtain the `consume` permission of topic 
`persistent://public/default/test` and the permission of subscription `sub` in 
management platform.
+At this time, the role list under the namespace `public/default` is 
+```
+'sub' -> ['jack']
+```
+Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic 
`persistent://public/default/test` without notifying the administrator. The 
role list under the namespace `public/default` remain unchanged as
+```
+'sub' -> ['jack']
+```
+The administrator can't know who is the owner of the subscriptions `sub1`, 
`sub2`,...
+
+What is worse, if another user `tom` obtain the permission of subscription 
`sub1` under the same namespace `public/default` in management platform, that 
result into the role list under the namespace `public/default` become to 
+``` 
+'sub' -> ['jack']
+'sub1' -> ['tom']
+```
+`jack` lose the permission of subscription `sub1` immediately. Because jack do 
not have the permission of subscription `sub1` really.
+
+He just uses an unregistered subscription `sub1` owned by nobody to consume 
the messages. Once there are any people obtain the permission of subscription 
`sub1`, his task consuming `sub1` will be shutdown immediately.
+
+# Motivation
+
+Such kind of permission lost problem cause us lots of problem. We need a tool 
to find out the owner of these unregistered subscriptions first.
+
+# Goals
+Add a field `role` in the consumer's stat to show the owner of this consumer.
+
+## In Scope
+
+- help administrator to find out the owner of the consumer
+
+## Out of Scope
+
+Maybe we improve the permission control of subscriptions in the future to fix 
the permission lost problem described above.
+
+
+# Backward & Forward Compatibility
+
+Fully compatible.

Review Comment:
   No, this pip is to provide a method to find out the owner of these 
uncontrolled subscription. The implementation for this pr is 
https://github.com/apache/pulsar/pull/22562. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22580:
URL: https://github.com/apache/pulsar/pull/22580#discussion_r1578943731


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -1254,7 +1254,8 @@ private CompletableFuture> 
createNonPersistentTopic(String topic
 nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
 } catch (Throwable e) {
 log.warn("Failed to create topic {}", topic, e);
-return FutureUtil.failedFuture(e);
+topicFuture.completeExceptionally(e);

Review Comment:
   Makes sense to me, I also tried to fix the flaky test, and during the 
process, I find that I didn't handle this case. 
   But the test keeps failing after fixed, so the issue never get fixed... I 
should create another PR to fix here first...
   
https://github.com/apache/pulsar/pull/22256/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8c704c6ed6e317f5R1254



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22580:
URL: https://github.com/apache/pulsar/pull/22580#discussion_r1578943731


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -1254,7 +1254,8 @@ private CompletableFuture> 
createNonPersistentTopic(String topic
 nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
 } catch (Throwable e) {
 log.warn("Failed to create topic {}", topic, e);
-return FutureUtil.failedFuture(e);
+topicFuture.completeExceptionally(e);

Review Comment:
   Makes sense to me, I also tried to fix the flaky test, and during the 
process, I find that I didn't handle this case. 
   But the test keeps failing after fixed, so the issue never get fixed
   
https://github.com/apache/pulsar/pull/22256/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8c704c6ed6e317f5R1254



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]

2024-04-24 Thread via GitHub


BewareMyPower commented on PR #22573:
URL: https://github.com/apache/pulsar/pull/22573#issuecomment-2076450786

   >  This was added to wait for some time after bundles are unloaded, but I 
don't think it is necessary.
   
   Agreed. We can remove it in another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]

2024-04-24 Thread via GitHub


michaeljmarshall commented on code in PR #22564:
URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578873732


##
pip/pip-347.md:
##
@@ -0,0 +1,68 @@
+
+# PIP-347: add role field in consumer's stat
+
+# Background knowledge
+
+- In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
+- When users want to create a subscription in the management platform, we will 
grant the `consume` permission of topic and the permission of
+that subscription to the user's token. so that user can create a subscription 
successfully.
+- We also set config `allowAutoSubscriptionCreation` to be true to allow user 
creating subscriptions automatically if user's token has the `consume` 
permission.
+But, **auto-creation of subscriptions will not grant the permission of the 
subscription to the user's token**.
+- The permission control flow of subscription: 
+  - we can only grant and revoke the permission of a subscription by the 
`grant-permission` and `revoke-permission` REST API.
+  - The permission of subscription is in the namespace level. When a user 
creates a subscription `sub` for topic `persistent://public/default/test`,
+  broker will check if user's token has the `consume` permission of 
`persistent://public/default/test`, if yes, broker will retrieve the permission 
of all subscriptions under the namespace `public/default` and get the role list 
of subscription `sub`.
+- **If the role list is empty or null, broker allow to create the 
subscription.** However, the permission of the subscription is still not 
granted to the user's token.
+- If the role list is not empty, broker will check if the role 
corresponding to the user's token is in the role list. If yes, broker allow to 
create the subscription.
+- If the role corresponding to the user's token is not in the role list, 
broker will reject the request.
+
+There is a permission hole in the current design. If a user obtain the 
`consume` permission of a topic in management platform, he can create infinite 
subscriptions without notifying the administrator.
+For example, user `jack` can obtain the `consume` permission of topic 
`persistent://public/default/test` and the permission of subscription `sub` in 
management platform.
+At this time, the role list under the namespace `public/default` is 
+```
+'sub' -> ['jack']
+```
+Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic 
`persistent://public/default/test` without notifying the administrator. The 
role list under the namespace `public/default` remain unchanged as
+```
+'sub' -> ['jack']
+```
+The administrator can't know who is the owner of the subscriptions `sub1`, 
`sub2`,...
+
+What is worse, if another user `tom` obtain the permission of subscription 
`sub1` under the same namespace `public/default` in management platform, that 
result into the role list under the namespace `public/default` become to 
+``` 
+'sub' -> ['jack']
+'sub1' -> ['tom']
+```
+`jack` lose the permission of subscription `sub1` immediately. Because jack do 
not have the permission of subscription `sub1` really.
+
+He just uses an unregistered subscription `sub1` owned by nobody to consume 
the messages. Once there are any people obtain the permission of subscription 
`sub1`, his task consuming `sub1` will be shutdown immediately.
+
+# Motivation
+
+Such kind of permission lost problem cause us lots of problem. We need a tool 
to find out the owner of these unregistered subscriptions first.
+
+# Goals
+Add a field `role` in the consumer's stat to show the owner of this consumer.
+
+## In Scope
+
+- help administrator to find out the owner of the consumer
+
+## Out of Scope
+
+Maybe we improve the permission control of subscriptions in the future to fix 
the permission lost problem described above.
+
+
+# Backward & Forward Compatibility
+
+Fully compatible.

Review Comment:
   Assuming that https://github.com/apache/pulsar/pull/22579 is the desired 
implementation, that will break any installations that do not already grant 
permission for each role to each subscription on the namespace level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix permission hole for subscription. [pulsar]

2024-04-24 Thread via GitHub


Technoboy- closed pull request #22579: [fix] [broker] fix permission hole for 
subscription.
URL: https://github.com/apache/pulsar/pull/22579


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix] [broker] [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed [pulsar]

2024-04-24 Thread via GitHub


poorbarcode opened a new pull request, #22580:
URL: https://github.com/apache/pulsar/pull/22580

   ### Motivation
   
   1. The test `BrokerServiceTest.testBrokerStatsTopicLoadFailed` always fails, 
so https://github.com/apache/pulsar/pull/20494 disabled it.
   2. There is a case that `pulsar_topic_load_failed_count` does not increase 
when load non-persistent topic fails[1].
   
   **[1]**: 
https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1239-L1258
   ```java
   CompletableFuture> topicFuture = new CompletableFuture<>();
   topicFuture.exceptionally(t -> {
   pulsarStats.recordTopicLoadFailed();
   return null;
   });
   try {
   nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
   } catch (Throwable e) {
   log.warn("Failed to create topic {}", topic, e);
   // Highlight: It is a new future, so the event 
`topicFuture.exceptionally` will not receive the error callback anymore.
   return FutureUtil.failedFuture(e);
   }
   ```
   
   ### Modifications
   
   1. fix the test.
   2. fix the issue
   
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578834981


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
 return ledger.getLastConfirmedEntry();
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {

Review Comment:
   About the simple method comment, I know your intention.
   
   After these change, there will be fewer methods. But this logic will runs 
into the `PersistentTopic`.
   ```java
   PositionImpl tnxMaxReadPosition = 
transactionBuffer.getMaxReadPosition();
   if (getLastPosition() == tnxMaxReadPosition) {
   return topic.getLastCanDispatchPosition();
   } else {
   return CompletableFuture.completedFuture(tnxMaxReadPosition);
   }
   ```
   
   I still want it to be cohesive in class `TransactionBuffer`.
   
   I think makes sense to expose the `getLastCanDispatchPosition` method in 
`TransactionBuffer` and `Topic`, it can clearly remind the caller:  **The 
position returned by the `getLastPosition` method is not dispatchable, should 
use `getLastCanDispatchPosition`**
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]

2024-04-24 Thread via GitHub


michaeljmarshall commented on code in PR #22564:
URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578835168


##
pip/pip-347.md:
##
@@ -0,0 +1,68 @@
+
+# PIP-347: add role field in consumer's stat
+
+# Background knowledge
+
+- In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
+- When users want to create a subscription in the management platform, we will 
grant the `consume` permission of topic and the permission of
+that subscription to the user's token. so that user can create a subscription 
successfully.
+- We also set config `allowAutoSubscriptionCreation` to be true to allow user 
creating subscriptions automatically if user's token has the `consume` 
permission.
+But, **auto-creation of subscriptions will not grant the permission of the 
subscription to the user's token**.
+- The permission control flow of subscription: 
+  - we can only grant and revoke the permission of a subscription by the 
`grant-permission` and `revoke-permission` REST API.
+  - The permission of subscription is in the namespace level. When a user 
creates a subscription `sub` for topic `persistent://public/default/test`,
+  broker will check if user's token has the `consume` permission of 
`persistent://public/default/test`, if yes, broker will retrieve the permission 
of all subscriptions under the namespace `public/default` and get the role list 
of subscription `sub`.
+- **If the role list is empty or null, broker allow to create the 
subscription.** However, the permission of the subscription is still not 
granted to the user's token.
+- If the role list is not empty, broker will check if the role 
corresponding to the user's token is in the role list. If yes, broker allow to 
create the subscription.
+- If the role corresponding to the user's token is not in the role list, 
broker will reject the request.
+
+There is a permission hole in the current design. If a user obtain the 
`consume` permission of a topic in management platform, he can create infinite 
subscriptions without notifying the administrator.
+For example, user `jack` can obtain the `consume` permission of topic 
`persistent://public/default/test` and the permission of subscription `sub` in 
management platform.
+At this time, the role list under the namespace `public/default` is 
+```
+'sub' -> ['jack']
+```
+Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic 
`persistent://public/default/test` without notifying the administrator. The 
role list under the namespace `public/default` remain unchanged as
+```
+'sub' -> ['jack']
+```
+The administrator can't know who is the owner of the subscriptions `sub1`, 
`sub2`,...
+
+What is worse, if another user `tom` obtain the permission of subscription 
`sub1` under the same namespace `public/default` in management platform, that 
result into the role list under the namespace `public/default` become to 
+``` 
+'sub' -> ['jack']
+'sub1' -> ['tom']
+```
+`jack` lose the permission of subscription `sub1` immediately. Because jack do 
not have the permission of subscription `sub1` really.
+
+He just uses an unregistered subscription `sub1` owned by nobody to consume 
the messages. Once there are any people obtain the permission of subscription 
`sub1`, his task consuming `sub1` will be shutdown immediately.
+
+# Motivation
+
+Such kind of permission lost problem cause us lots of problem. We need a tool 
to find out the owner of these unregistered subscriptions first.
+
+# Goals
+Add a field `role` in the consumer's stat to show the owner of this consumer.
+
+## In Scope
+
+- help administrator to find out the owner of the consumer
+
+## Out of Scope
+
+Maybe we improve the permission control of subscriptions in the future to fix 
the permission lost problem described above.
+
+
+# Backward & Forward Compatibility
+
+Fully compatible.

Review Comment:
   If I understand correctly, this is not backwards compatible. It has been 
discussed previously. I'll follow up in the next 24 hours with references and 
additional information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578835194


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -4082,6 +4092,10 @@ public PositionImpl getMaxReadPosition() {
 return this.transactionBuffer.getMaxReadPosition();
 }
 
+public CompletableFuture getLastCanDispatchPositionWithTxn() {

Review Comment:
   Make sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578834981


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
 return ledger.getLastConfirmedEntry();
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {

Review Comment:
   About the simple method comment, I know your intention.
   
   After these change, there will be fewer methods. But this logic runs into 
the `persistentTopic`.
   ```java
   PositionImpl tnxMaxReadPosition = 
transactionBuffer.getMaxReadPosition();
   if (getLastPosition() == tnxMaxReadPosition) {
   return topic.getLastCanDispatchPosition();
   } else {
   return CompletableFuture.completedFuture(tnxMaxReadPosition);
   }
   ```
   
   I still want it to be cohesive in class `TransactionBuffer`.
   
   I think makes sense to expose the `getLastCanDispatchPosition` method in 
`TransactionBuffer` and `Topic`, it can clearly remind the caller:  **The 
position returned by the `getLastPosition` method is not dispatchable, should 
use `getLastCanDispatchPosition`**
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]

2024-04-24 Thread via GitHub


heesung-sn commented on PR #22573:
URL: https://github.com/apache/pulsar/pull/22573#issuecomment-2076338059

   OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS
   
   I think we can actually remove this. This was added to wait for some time 
after bundles are unloaded, but I don't think it is necessary.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix permission hole for subscription. [pulsar]

2024-04-24 Thread via GitHub


Technoboy- commented on PR #22579:
URL: https://github.com/apache/pulsar/pull/22579#issuecomment-2076336747

   Could you help add a test for this ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578822283


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -509,6 +509,16 @@ public PositionImpl getMaxReadPosition() {
 }
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {
+PositionImpl tnxMaxReadPosition = getMaxReadPosition();
+if (topic.getLastPosition() == tnxMaxReadPosition) {

Review Comment:
   I know `tnxMaxReadPosition` and `topic.getLastPosition()` are the same 
object if they are equals to, but for the compare about two `PositionImpl` 
objects, `topic.getLastPosition.compareTo(tnxMaxReadPosition) == 0` is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578818795


##
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException 
exception, Optional
 return future;
 }
 
+@Override
+public CompletableFuture 
asyncReverseFindPositionOneByOne(Predicate predicate) {

Review Comment:
   I want to show two key points in the method name: `Reverse` and `OneByOne`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]

2024-04-24 Thread via GitHub


nodece commented on PR #22577:
URL: https://github.com/apache/pulsar/pull/22577#issuecomment-2076306671

   I think we intentionally overlooked the result of creating a topic for the 
remote cluster.
   
   This PR may break the user's behavior of creating topic.
   
   According to your description, when using the default broker configuration, 
and the geo-replication is enabled on the namespace level, the remote cluster 
will create two non-partitioned topics, tenant/namespace/topic-partition-1 and 
tenant/namespace/topic-partition-2 by the geo producer. Is it right?
   
   If right, I would suggest adding a topic check before starting the 
replicator to make sure that the topic is the same between local and remote 
clusters, if they are the same, start the replicator, otherwise throw a log. 
You can also add this check in the replicator task.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP-347 Extend LTS release process to client SDKs [pulsar]

2024-04-24 Thread via GitHub


thetumbled commented on PR #22578:
URL: https://github.com/apache/pulsar/pull/22578#issuecomment-2076306613

   Hi, this PIP is confict with https://github.com/apache/pulsar/pull/22564.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809582


##
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException 
exception, Optional
 return future;
 }
 
+@Override
+public CompletableFuture 
asyncReverseFindPositionOneByOne(Predicate predicate) {

Review Comment:
   Maybe `asyncFindLastValidPosition` or something else is better



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##
@@ -160,6 +160,12 @@ public interface TransactionBuffer {
  */
 PositionImpl getMaxReadPosition();
 
+/**
+ * Get the can dispatch max position.
+ * @return the stable position.
+ */
+CompletableFuture getLastCanDispatchPosition();

Review Comment:
   Maybe we don't need this method in TransactionBuffer, and move 
`TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to 
`PersistentTopic#getLastCanDispatchPositionWithTxn()` is better.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
 return ledger.getLastConfirmedEntry();
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {

Review Comment:
   Maybe we can make the method private or move the logics to 
`getLastCanDispatchPositionWithTxn()`, just expose 
   `getLastCanDispatchPositionWithTxn()`.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -4082,6 +4092,10 @@ public PositionImpl getMaxReadPosition() {
 return this.transactionBuffer.getMaxReadPosition();
 }
 
+public CompletableFuture getLastCanDispatchPositionWithTxn() {

Review Comment:
   Maybe we can rename the method to `getLastDispatchablePositionWithTxn` or 
something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix permission hole for subscription. [pulsar]

2024-04-24 Thread via GitHub


thetumbled commented on PR #22579:
URL: https://github.com/apache/pulsar/pull/22579#issuecomment-2076301940

   PTAL, thanks. @tisonkun @michaeljmarshall @codelipenghui @BewareMyPower 
@jiazhai @lhotari 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix] [broker] fix permission hole for subscription. [pulsar]

2024-04-24 Thread via GitHub


thetumbled opened a new pull request, #22579:
URL: https://github.com/apache/pulsar/pull/22579

   ### Motivation
   Motivated by the permission hole described in 
https://github.com/apache/pulsar/pull/22564.
   
   - In a typical Pulsar deployment, we set `authenticationEnabled=true` and 
`authorizationEnabled=true` to enable authentication and authorization.
   - When users want to create a subscription in the management platform, we 
will grant the `consume` permission of topic and the permission of
   that subscription to the user's token. so that user can create a 
subscription successfully.
   - We also set config `allowAutoSubscriptionCreation` to be true to allow 
user creating subscriptions automatically if user's token has the `consume` 
permission.
   But, **auto-creation of subscriptions will not grant the permission of the 
subscription to the user's token**.
   - The permission control flow of subscription: 
 - we can only grant and revoke the permission of a subscription by the 
`grant-permission` and `revoke-permission` REST API.
 - The permission of subscription is in the namespace level. When a user 
creates a subscription `sub` for topic `persistent://public/default/test`,
 broker will check if user's token has the `consume` permission of 
`persistent://public/default/test`, if yes, broker will retrieve the permission 
of all subscriptions under the namespace `public/default` and get the role list 
of subscription `sub`.
   - **If the role list is empty or null, broker allow to create the 
subscription.** However, the permission of the subscription is still not 
granted to the user's token.
   - If the role list is not empty, broker will check if the role 
corresponding to the user's token is in the role list. If yes, broker allow to 
create the subscription.
   - If the role corresponding to the user's token is not in the role list, 
broker will reject the request.
   
   There is a permission hole in the current design. If a user obtain the 
`consume` permission of a topic in management platform, he can create infinite 
subscriptions without notifying the administrator.
   For example, user `jack` can obtain the `consume` permission of topic 
`persistent://public/default/test` and the permission of subscription `sub` in 
management platform.
   At this time, the role list under the namespace `public/default` is 
   ```
   'sub' -> ['jack']
   ```
   Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for 
topic `persistent://public/default/test` without notifying the administrator. 
The role list under the namespace `public/default` remain unchanged as
   ```
   'sub' -> ['jack']
   ```
   The administrator can't know who is the owner of the subscriptions `sub1`, 
`sub2`,...
   
   What is worse, if another user `tom` obtain the permission of subscription 
`sub1` under the same namespace `public/default` in management platform, that 
result into the role list under the namespace `public/default` become to 
   ``` 
   'sub' -> ['jack']
   'sub1' -> ['tom']
   ```
   `jack` lose the permission of subscription `sub1` immediately. Because jack 
do not have the permission of subscription `sub1` really.
   
   He just uses an unregistered subscription `sub1` owned by nobody to consume 
the messages. Once there are any people obtain the permission of subscription 
`sub1`, his task consuming `sub1` will be shutdown immediately.
   
   
   ### Modifications
   
   Do not allow to subscribe if the role do not have the permission really.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809479


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##
@@ -160,6 +160,12 @@ public interface TransactionBuffer {
  */
 PositionImpl getMaxReadPosition();
 
+/**
+ * Get the can dispatch max position.
+ * @return the stable position.
+ */
+CompletableFuture getLastCanDispatchPosition();

Review Comment:
   Maybe we don't need this method, and move 
`TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to 
`PersistentTopic#getLastCanDispatchPositionWithTxn()` is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809479


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##
@@ -160,6 +160,12 @@ public interface TransactionBuffer {
  */
 PositionImpl getMaxReadPosition();
 
+/**
+ * Get the can dispatch max position.
+ * @return the stable position.
+ */
+CompletableFuture getLastCanDispatchPosition();

Review Comment:
   Maybe we don't need this method, and move 
`TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to 
`PersistentTopic#getLastCanDispatchPositionWithTxn()` is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578778014


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
 }
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {
+return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   Make sense to me, I add this logic and add 
`ReplicatorSubscriptionWithTransactionTest` to cover treansaction enable case. 
Thanks for your reviews.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578776447


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##
@@ -4321,4 +4321,44 @@ public void 
testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
 assertEquals(ml.currentLedgerEntries, 0);
 });
 }
+
+@Test
+public void testReverseFindPositionOneByOne() throws Exception {
+final int maxEntriesPerLedger = 5;
+
+ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", 
managedLedgerConfig);
+
+String matchEntry = "match-entry";
+String noMatchEntry = "nomatch-entry";
+Predicate predicate = entry -> {
+try {
+String entryValue = entry.getDataBuffer().toString(UTF_8);
+if (matchEntry.equals(entryValue)) {
+return true;
+}
+} finally {
+entry.release();
+}

Review Comment:
   Make sense, moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] NumberFormatException when consuming messages from pulsar-admin CLI [pulsar]

2024-04-24 Thread via GitHub


visortelle commented on issue #22405:
URL: https://github.com/apache/pulsar/issues/22405#issuecomment-2075814941

   > Unfortunately, nobody ran extensive tests with the docker images during 
the release candidate phase
   
   Would having nightly images help to catch more issues before an actual 
release?
   https://github.com/apache/pulsar/issues/14755


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578390902


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
 }
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {
+return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   I mean if we want to fix #22571 by adding `getLastCanDispatchPosition` here, 
we also need to consider the `maxReadPosition` equals to `lastConfirmedEntry` 
and it could be a Marker.
   
   Before 
   ```java
   return CompletableFuture.completedFuture(getMaxReadPosition());
   ```
   verify the `maxReadPosition` is valid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][site] PIP-264: Document topic messaging metrics [pulsar-site]

2024-04-24 Thread via GitHub


dragosvictor commented on code in PR #880:
URL: https://github.com/apache/pulsar-site/pull/880#discussion_r1578304337


##
docs/reference-metrics-opentelemetry.md:
##
@@ -8,6 +8,153 @@ Pulsar exposes the following OpenTelemetry metrics.
 
 ## Broker
 
+### Topic Messaging metrics
+
+ pulsar.broker.topic.subscription.count
+The number of Pulsar subscriptions of the topic served by this broker.
+* Type: UpDownCounter
+* Unit: `{subscription}`
+
+ pulsar.broker.topic.producer.count
+The number of active producers of the topic connected to this broker.
+* Type: UpDownCounter
+* Unit: `{producer}`
+
+ pulsar.broker.topic.consumer.count
+The number of active consumers of the topic connected to this broker.
+* Type: UpDownCounter
+* Unit: `{consumer}`
+
+ pulsar.broker.topic.message.incoming.count
+The total number of messages received for this topic.
+* Type: Counter
+* Unit: `{message}`
+
+ pulsar.broker.topic.message.outgoing.count
+The total number of messages read from this topic.
+* Type: Counter
+* Unit: `{message}`
+
+ pulsar.broker.topic.message.incoming.size
+The total number of messages bytes received for this topic.
+* Type: Counter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.message.outgoing.size
+The total number of messages bytes read from this topic.
+* Type: Counter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.publish.rate.limit
+The number of times the publish rate limit is triggered.
+* Type: Counter
+* Unit: `{event}`
+
+ pulsar.broker.topic.consumer.msg.ack
+The total number of message acknowledgments received for this topic.
+* Type: Counter
+* Unit: `{ack}`
+
+ pulsar.broker.topic.storage.size
+The total storage size of the messages in this topic.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.logical.size
+The storage size of topics in the namespace owned by the broker without 
replicas.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.backlog.size
+The total backlog size of the topics of this topic owned by this broker.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.offloaded.size
+The total amount of the data in this topic offloaded to the tiered storage.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.backlog.quota.limit.size
+The size based backlog quota limit for this topic.
+* Type: Gauge
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.backlog.quota.limit.time
+The time based backlog quota limit for this topic.
+* Type: Gauge
+* Unit: `{second}`
+
+ pulsar.broker.topic.storage.backlog.quota.eviction.count
+The number of times a backlog was evicted since it has exceeded its quota.
+* Type: Counter
+* Unit: `{eviction}`
+
+ pulsar.broker.topic.storage.backlog.age
+The age of the oldest unacknowledged message (backlog).
+* Type: Gauge
+* Unit: `{second}`
+
+ pulsar.broker.topic.storage.outgoing
+The total message batches (entries) written to the storage for this topic.
+* Type: UpDownCounter
+* Unit: `{message batch}`
+
+ pulsar.broker.topic.storage.incoming
+The total message batches (entries) read from the storage for this topic.
+* Type: UpDownCounter
+* Unit: `{message batch}`
+
+ pulsar.broker.topic.compaction.removed.event.count
+The total number of removed events of the compaction.
+* Type: UpDownCounter
+* Unit: `{event}`
+
+ pulsar.broker.topic.compaction.succeed.count

Review Comment:
   Refactored, as described 
[below](https://github.com/apache/pulsar-site/pull/880/files/8986d386d0f76d6a14e3cb709c801a053219a764#r1578260459).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)

2024-04-24 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b774666331d [fix] [broker] Part-2: Replicator can not created 
successfully due to an orphan replicator in the previous topic owner (#21948)
b774666331d is described below

commit b774666331db33ea6407174e0fe6e27a73160522
Author: fengyubiao 
AuthorDate: Thu Apr 25 01:45:41 2024 +0800

[fix] [broker] Part-2: Replicator can not created successfully due to an 
orphan replicator in the previous topic owner (#21948)
---
 .../pulsar/broker/service/AbstractReplicator.java  |  10 +-
 .../apache/pulsar/broker/service/Replicator.java   |   2 +
 .../service/persistent/PersistentReplicator.java   |   9 +-
 .../broker/service/persistent/PersistentTopic.java |  58 +--
 .../broker/service/OneWayReplicatorTest.java   | 166 +
 .../broker/service/OneWayReplicatorTestBase.java   |  14 +-
 .../pulsar/broker/service/ReplicatorTest.java  |   2 +-
 7 files changed, 239 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index f34144deb0a..394fad21ae6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -248,7 +248,7 @@ public abstract class AbstractReplicator implements 
Replicator {
 }
 startProducer();
 }).exceptionally(ex -> {
-log.warn("[{}] [{}] Stop retry to create producer due to 
unknown error(topic create failed), and"
+log.error("[{}] [{}] Stop retry to create producer due to 
unknown error(topic create failed), and"
 + " trigger a terminate. Replicator state: {}",
 localTopicName, replicatorId, STATE_UPDATER.get(this), 
ex);
 terminate();
@@ -377,9 +377,13 @@ public abstract class AbstractReplicator implements 
Replicator {
 this.producer = null;
 // set the cursor as inactive.
 disableReplicatorRead();
+// release resources.
+doReleaseResources();
 });
 }
 
+protected void doReleaseResources() {}
+
 protected boolean tryChangeStatusToTerminating() {
 if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Terminating)){
 return true;
@@ -468,4 +472,8 @@ public abstract class AbstractReplicator implements 
Replicator {
 }
 return compareSetAndGetState(expect, update);
 }
+
+public boolean isTerminated() {
+return state == State.Terminating || state == State.Terminated;
+}
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
index 8130b855b4e..5c314397da8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java
@@ -51,4 +51,6 @@ public interface Replicator {
 boolean isConnected();
 
 long getNumberOfEntriesInBacklog();
+
+boolean isTerminated();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 5e1cc4a936a..367d1965207 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -450,7 +450,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 long waitTimeMillis = readFailureBackoff.next();
 
 if (exception instanceof CursorAlreadyClosedException) {
-log.error("[{}] Error reading entries because replicator is"
+log.warn("[{}] Error reading entries because replicator is"
 + " already deleted and cursor is already closed 
{}, ({})",
 replicatorId, ctx, exception.getMessage(), exception);
 // replicator is already deleted and cursor is already closed so, 
producer should also be disconnected.
@@ -570,7 +570,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
 exception.getMessage(), exception);
 if (exception instanceof CursorAlreadyClosedException) {
-log.error("[{}] Asynchronous ack failure because replicator is 
already

Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-24 Thread via GitHub


poorbarcode merged PR #21948:
URL: https://github.com/apache/pulsar/pull/21948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-24 Thread via GitHub


codecov-commenter commented on PR #21948:
URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2075491809

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21948?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 Report
   Attention: Patch coverage is `67.56757%` with `12 lines` in your changes are 
missing coverage. Please review.
   > Project coverage is 73.95%. Comparing base 
[(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 to head 
[(`3d389b1`)](https://app.codecov.io/gh/apache/pulsar/pull/21948?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 186 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/21948/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #21948  +/-   ##
   
   + Coverage 73.57%   73.95%   +0.38% 
   + Complexity32624 2744   -29880 
   
 Files  1877 1885   +8 
 Lines139502   140512+1010 
 Branches  1529915450 +151 
   
   + Hits 102638   103916+1278 
   + Misses2890828563 -345 
   - Partials   7956 8033  +77 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `26.98% <21.62%> (+2.39%)` | :arrow_up: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `24.47% <10.81%> (+0.15%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | `73.24% <67.56%> (+0.39%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/21948?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...a/org/apache/pulsar/broker/service/Replicator.java](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FReplicator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1JlcGxpY2F0b3IuamF2YQ==)
 | `0.00% <ø> (ø)` | |
   | 
[...roker/service/persistent/PersistentReplicator.java](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentReplicator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFJlcGxpY2F0b3IuamF2YQ==)
 | `66.21% <75.00%> (-2.66%)` | :arrow_down: |
   | 
[...ache/pulsar/broker/service/AbstractReplicator.java](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FAbstractReplicator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0UmVwbGljYXRvci5qYXZh

(pulsar-dotpulsar) branch master updated: Update CHANGELOG.md

2024-04-24 Thread djensen
This is an automated email from the ASF dual-hosted git repository.

djensen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 921479b  Update CHANGELOG.md
921479b is described below

commit 921479b83e11fc2cb1a9d719b1ab2442195f7f1f
Author: entvex <1580435+ent...@users.noreply.github.com>
AuthorDate: Wed Apr 24 19:39:08 2024 +0200

Update CHANGELOG.md
---
 CHANGELOG.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d75ba78..aae57f0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,7 +4,7 @@ All notable changes to this project will be documented in this 
file.
 
 The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to 
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
-## [3.2.1-rc.1] - 2024-04-19
+## [3.2.1] - 2024-04-24
 
 ### Fixed
 



[PR] [improve] PIP-347 Extend LTS release process to client SDKs.md [pulsar]

2024-04-24 Thread via GitHub


merlimat opened a new pull request, #22578:
URL: https://github.com/apache/pulsar/pull/22578

   ### Motivation
   
   
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [X] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-dotpulsar) tag 3.2.1 created (now ffa5665)

2024-04-24 Thread djensen
This is an automated email from the ASF dual-hosted git repository.

djensen pushed a change to tag 3.2.1
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


  at ffa5665  (commit)
No new revisions were added by this update.



Re: [PR] [feat][site] PIP-264: Document topic messaging metrics [pulsar-site]

2024-04-24 Thread via GitHub


dragosvictor commented on code in PR #880:
URL: https://github.com/apache/pulsar-site/pull/880#discussion_r1578260459


##
docs/reference-metrics-opentelemetry.md:
##
@@ -8,6 +8,153 @@ Pulsar exposes the following OpenTelemetry metrics.
 
 ## Broker
 
+### Topic Messaging metrics
+
+ pulsar.broker.topic.subscription.count
+The number of Pulsar subscriptions of the topic served by this broker.
+* Type: UpDownCounter
+* Unit: `{subscription}`
+
+ pulsar.broker.topic.producer.count
+The number of active producers of the topic connected to this broker.
+* Type: UpDownCounter
+* Unit: `{producer}`
+
+ pulsar.broker.topic.consumer.count
+The number of active consumers of the topic connected to this broker.
+* Type: UpDownCounter
+* Unit: `{consumer}`
+
+ pulsar.broker.topic.message.incoming.count
+The total number of messages received for this topic.
+* Type: Counter
+* Unit: `{message}`
+
+ pulsar.broker.topic.message.outgoing.count
+The total number of messages read from this topic.
+* Type: Counter
+* Unit: `{message}`
+
+ pulsar.broker.topic.message.incoming.size
+The total number of messages bytes received for this topic.
+* Type: Counter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.message.outgoing.size
+The total number of messages bytes read from this topic.
+* Type: Counter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.publish.rate.limit
+The number of times the publish rate limit is triggered.
+* Type: Counter
+* Unit: `{event}`
+
+ pulsar.broker.topic.consumer.msg.ack
+The total number of message acknowledgments received for this topic.
+* Type: Counter
+* Unit: `{ack}`
+
+ pulsar.broker.topic.storage.size
+The total storage size of the messages in this topic.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.logical.size
+The storage size of topics in the namespace owned by the broker without 
replicas.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.backlog.size
+The total backlog size of the topics of this topic owned by this broker.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.offloaded.size
+The total amount of the data in this topic offloaded to the tiered storage.
+* Type: UpDownCounter
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.backlog.quota.limit.size
+The size based backlog quota limit for this topic.
+* Type: Gauge
+* Unit: `{byte}`
+
+ pulsar.broker.topic.storage.backlog.quota.limit.time
+The time based backlog quota limit for this topic.
+* Type: Gauge
+* Unit: `{second}`
+
+ pulsar.broker.topic.storage.backlog.quota.eviction.count
+The number of times a backlog was evicted since it has exceeded its quota.
+* Type: Counter
+* Unit: `{eviction}`
+
+ pulsar.broker.topic.storage.backlog.age
+The age of the oldest unacknowledged message (backlog).
+* Type: Gauge
+* Unit: `{second}`
+
+ pulsar.broker.topic.storage.outgoing
+The total message batches (entries) written to the storage for this topic.
+* Type: UpDownCounter
+* Unit: `{message batch}`
+
+ pulsar.broker.topic.storage.incoming
+The total message batches (entries) read from the storage for this topic.
+* Type: UpDownCounter
+* Unit: `{message batch}`
+
+ pulsar.broker.topic.compaction.removed.event.count
+The total number of removed events of the compaction.
+* Type: UpDownCounter
+* Unit: `{event}`
+
+ pulsar.broker.topic.compaction.succeed.count
+The total number of successes of the compaction.
+* Type: UpDownCounter
+* Unit: `{event}`
+
+ pulsar.broker.topic.compaction.failed.count

Review Comment:
   Refactored into just one metric named 
`pulsar.broker.topic.compaction.operation.count`, with an attribute to specify 
the result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]

2024-04-24 Thread via GitHub


poorbarcode closed pull request #22577: [fix] [broker] [Namespace-level 
Geo-Replication] Reject a topic creation if there is a confilct topic on the 
remote side
URL: https://github.com/apache/pulsar/pull/22577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix] Include swagger annotations in shaded client lib (#22570)

2024-04-24 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 62546d977eb [fix] Include swagger annotations in shaded client lib 
(#22570)
62546d977eb is described below

commit 62546d977ebb291dffa4629c6c8ee5fbcd559777
Author: Matteo Merli 
AuthorDate: Tue Apr 23 22:32:06 2024 -0700

[fix] Include swagger annotations in shaded client lib (#22570)
---
 distribution/shell/src/assemble/LICENSE.bin.txt | 1 +
 pulsar-client/pom.xml   | 1 -
 2 files changed, 1 insertion(+), 1 deletion(-)

diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 2bdcac5532c..4bf34fb1369 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -331,6 +331,7 @@ The Apache Software License, Version 2.0
 - listenablefuture-.0-empty-to-avoid-conflict-with-guava.jar
  * J2ObjC Annotations -- j2objc-annotations-1.3.jar
  * Netty Reactive Streams -- netty-reactive-streams-2.0.6.jar
+ * Swagger -- swagger-annotations-1.6.2.jar
  * DataSketches
 - memory-0.8.3.jar
 - sketches-core-0.8.3.jar
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index a8b98e7ab26..7b918533531 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -76,7 +76,6 @@
 
   io.swagger
   swagger-annotations
-  provided
 
 
 



Re: [I] [Bug] [ARM] Partially acknowledged batches are not redelivered [pulsar-client-cpp]

2024-04-24 Thread via GitHub


bph-sag commented on issue #424:
URL: 
https://github.com/apache/pulsar-client-cpp/issues/424#issuecomment-2075337713

   > > Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), 
pulsar-client-cpp 3.3.0
   > > We've used the same broker and test with an x86_64 Pulsar 3.5.0 Pulsar 
client, and we've had no issues (RHEL8).
   > 
   > So you tested 3.5.0-x86_64 and 3.3.0-aarch64?
   > 
   > Since a batch is stored as a BK entry, you can check the topic stats for 
the mark-delete position and compare it with the message id you received. The 
result you see might because the whole batch was somehow acknowledged.
   > 
   > BTW, if you suspect the issue is related to the DEB packages, you can try 
building from source and then test it again. You can try 
[vcpkg](https://github.com/apache/pulsar-client-cpp/tree/main/vcpkg-example#vcpkg-example)
 to save your time. (Currently only 3.4.2 is supported, I will add the 3.5.0 to 
vcpkg soon)
   
   Oops, typo'd the x86_64 version, I meant to say 3.3.0-x86_64. Corrected the 
parent comment.
   
   And no, we're not using the Debian packages, we're compiling it ourselves 
for aarch32 (x86_64 as well). So, 3.3.0-aarch32. 
   
   --- 
   
   I'll try and take a look at the topic stats, and get back to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix] [broker] [namespace Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]

2024-04-24 Thread via GitHub


poorbarcode opened a new pull request, #22577:
URL: https://github.com/apache/pulsar/pull/22577

   ### Motivation
   
   The steps to describe the issue
   - There is a partitioned topic on the remote cluster, with partitions `1`. 
And the source cluster does not have this topic.
   - Enable namespace-level Geo-Replication.
   - Create the topic with `3` partitions on the source cluster.
   - The broker just printed an error log "[Error] This topic already exists", 
and users get a successful response.
   
   ### Modifications
   
   Reject a topic creation if there is a conflict topic on the remote side
   
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: x
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] /metrics gzip compression sporadically fails with error 500 caused by java.nio.BufferOverflowException [pulsar]

2024-04-24 Thread via GitHub


lhotari opened a new issue, #22575:
URL: https://github.com/apache/pulsar/issues/22575

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   master
   
   ### Minimal reproduce step
   
   TBD
   
   ### What did you expect to see?
   
   getting /metrics with gzip compression shouldn't fail
   
   ### What did you see instead?
   
   it sometimes fails
   
   ### Anything else?
   
   The problem in the code is here: 
https://github.com/apache/pulsar/blob/94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java#L236-L241
   putInts will fail if the compress buffer doesn't have remaining space.
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] [ARM] Partially acknowledged batches are not redelivered [pulsar-client-cpp]

2024-04-24 Thread via GitHub


bph-sag opened a new issue, #424:
URL: https://github.com/apache/pulsar-client-cpp/issues/424

   ### Search before asking
   
   - [X] I searched in the 
[issues](https://github.com/apache/pulsar-client-cpp/issues) and found nothing 
similar.
   
   
   ### Version
   
   Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), 
pulsar-client-cpp 3.3.0
   
   ### Minimal reproduce step
   
   1. Send a batch of events to the Pulsar broker (in our reproduction, it is a 
batch size of 309). 
   2. Receive these messages (we're using `pulsar::Consumer::receive` in a 
loop), while continuously acknowledging them asynchronously (acknowledgements 
are done using `pulsar::Consumer::acknowledge(const MessageId &messageId)`, 
where `messageId` is `pulsar::Message::getMessageId`).
   3. Terminate the program during the middle of receiving a batch (we're 
terminating after having received and acknowledged the 190/309 events from a 
batch).
   4. Resume the program.
   
   
   
   
   ### What did you expect to see?
   
   All of the events in the batch re-sent.
   
   ### What did you see instead?
   
   None of the events in the batch, _including the ones never received_ are 
re-sent.
   
   ### Anything else?
   
   - We log the result of the acknowledgement in the callback method, which 
shows us that the most we've acknowledged is the `190th` message in a batch.
   - Exclusive subscription.
   - Running it on Gravitron2/3 processors.
   - We've used the same broker and test with an x86_64 Pulsar 3.5.0 Pulsar 
client, and we've had no issues (RHEL8).
   - We can actively see missing messages - we are just sending consecutive 
numbers and logging them. We're missing exactly 119, the amount remaining in 
the batch that we've not fully acknowledged.
   - We've tried not immediately terminating - i.e. we stop receiving messages 
but continue to process any callbacks, to validate that we're just not seeing 
119 callbacks missing, and nothing.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577951228


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
 }
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {
+return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   I agree with your point, I'm not familiar with transaction implementation 
yet, and I'm not sure if there's anything special to deal with. 
   
   This PR will keep the logic the same as before when enabling transactions. 
and I think I can write a test in the next PR to verify your guess and fix the 
transaction buffer issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577928912


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
 }
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {
+return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   And if all the txns committed/aborted, the maxReadPosition will also set to 
lastConfirmedEntry, do we need to consider the case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577908495


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
 }
 }
 
+@Override
+public CompletableFuture getLastCanDispatchPosition() {
+return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   It looks a little strange, after recover finished and there is no ongoing 
txns, the maxReadPosition will be set to ledger's lastConfirmedEntry, and the 
last entry also could be a Marker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] /data/bookie1/ledgers/current log clean [pulsar]

2024-04-24 Thread via GitHub


GitHub user visortelle edited a comment on the discussion: 
/data/bookie1/ledgers/current log clean

I didn't configure it for Bookkeeper, but I suppose that you can configure it 
in `log4j.properties` file:

> To enable logging for a bookie, create a log4j.properties file and point the 
> BOOKIE_LOG_CONF environment variable to the configuration file.

https://bookkeeper.apache.org/docs/admin/bookies#logging

> RollingFileAppender does this. You just need to set maxBackupIndex to the 
> highest value for the backup file.

https://stackoverflow.com/a/1050318/4182882

GitHub link: 
https://github.com/apache/pulsar/discussions/22574#discussioncomment-9213583


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] /data/bookie1/ledgers/current log clean [pulsar]

2024-04-24 Thread via GitHub


GitHub user visortelle added a comment to the discussion: 
/data/bookie1/ledgers/current log clean

I suppose you can configure it in `log4j.properties` file:

> To enable logging for a bookie, create a log4j.properties file and point the 
> BOOKIE_LOG_CONF environment variable to the configuration file.

https://bookkeeper.apache.org/docs/admin/bookies#logging

> RollingFileAppender does this. You just need to set maxBackupIndex to the 
> highest value for the backup file.

https://stackoverflow.com/a/1050318/4182882

GitHub link: 
https://github.com/apache/pulsar/discussions/22574#discussioncomment-9213583


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



[PR] [fix][broker] Avoid being stuck in 30+ seconds when closing the BrokerService [pulsar]

2024-04-24 Thread via GitHub


BewareMyPower opened a new pull request, #22573:
URL: https://github.com/apache/pulsar/pull/22573

   Fixes https://github.com/apache/pulsar/issues/22569
   
   ### Motivation
   
   `BrokerService#closeAsync` calls `unloadNamespaceBundlesGracefully` to 
unload namespaces gracefully. With extensible load manager, it eventually calls 
`TableViewLoadDataStoreImpl#validateProducer`:
   
   ```
   BrokerService#unloadNamespaceBundlesGracefully
 ExtensibleLoadManagerWrapper#disableBroker
   ExtensibleLoadManagerImpl#disableBroker
 ServiceUnitStateChannelImpl#cleanOwnerships
   ServiceUnitStateChannelImpl#doCleanup
 TableViewLoadDataStoreImpl#removeAsync
   TableViewLoadDataStoreImpl#validateProducer
   ```
   
   In `validateProducer`, if the producer is not connected, it will recreate 
the producer synchronously. However, since the state of `PulsarService` has 
already been changed to `Closing`, all connect or lookup requests will fail 
with `ServiceNotReady`. Then the client will retry until timeout.
   
   Besides, the unload operation could also trigger the reconnection because 
the extensible load manager sends the unload event to the 
`loadbalancer-service-unit-state` topic.
   
   ### Modifications
   
   The major fix:
   Before changing PulsarService's state to `Closing`, call 
`BrokerService#unloadNamespaceBundlesGracefully` first to make the load manager 
complete the unload operations first.
   
   Minor fixes:
   - Record the time when `LoadManager#disableBroker` is done.
   - Don't check if producer is disconnected because the producer could retry 
if it's disconnected.
   
   ### Verifications
   
   Add `ExtensibleLoadManagerCloseTest` to verify closing `PulsarService` won't 
take too much time. Here are some test results locally:
   
   ```
   2024-04-24T19:43:38,851 - INFO  - [main:ExtensibleLoadManagerCloseTest] - 
Brokers close time: [3342, 3276, 3310]
   2024-04-24T19:44:26,711 - INFO  - [main:ExtensibleLoadManagerCloseTest] - 
Brokers close time: [3357, 3258, 3298]
   2024-04-24T19:46:16,791 - INFO  - [main:ExtensibleLoadManagerCloseTest] - 
Brokers close time: [3313, 3257, 3263]
   2024-04-24T20:13:05,763 - INFO  - [main:ExtensibleLoadManagerCloseTest] - 
Brokers close time: [3304, 3279, 3299]
   2024-04-24T20:13:43,979 - INFO  - [main:ExtensibleLoadManagerCloseTest] - 
Brokers close time: [3343, 3308, 3310]
   ```
   
   As you can see, each broker takes only about 3 seconds to close due to 
`OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS` value added in 
https://github.com/apache/pulsar/pull/20315
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Apache Beam support for Pulsar [pulsar]

2024-04-24 Thread via GitHub


GitHub user hpvd edited a comment on the discussion: Apache Beam support for 
Pulsar

the progress for pulsar on becoming a first class citizen in beam, seems to 
have stucked :-(

There are not only some
- open issues, but also
- non merged but automatically as stale closed pull request.

Have integrated them into the overview: 
https://github.com/apache/beam/issues/31078


GitHub link: 
https://github.com/apache/pulsar/discussions/18453#discussioncomment-9212078


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Apache Beam support for Pulsar [pulsar]

2024-04-24 Thread via GitHub


GitHub user hpvd edited a comment on the discussion: Apache Beam support for 
Pulsar

the progress for pulsar on becoming a first class citizen in beam, seems to 
have stucked :-(

There are not only some
- open issues, but also
- non merged but automatically as stale closed pull request.

Have integrated them into the overview: 
https://github.com/apache/beam/issues/31079


GitHub link: 
https://github.com/apache/pulsar/discussions/18453#discussioncomment-9212078


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Apache Beam support for Pulsar [pulsar]

2024-04-24 Thread via GitHub


GitHub user hpvd added a comment to the discussion: Apache Beam support for 
Pulsar

the progress for pulsar on becoming a first class citizen in beam, seems to 
have stucked :-(

There are not only some
- open issues, but also
- non merged but automatically as stale closed pull request.
Have integrated them into the overview: 
https://github.com/apache/beam/issues/31079


GitHub link: 
https://github.com/apache/pulsar/discussions/18453#discussioncomment-9212078


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions [pulsar]

2024-04-24 Thread via GitHub


Technoboy- commented on code in PR #22536:
URL: https://github.com/apache/pulsar/pull/22536#discussion_r1577665699


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -139,28 +147,38 @@ public void sendMessages(List entries) {
 
 final Map> groupedEntries = 
localGroupedEntries.get();
 groupedEntries.clear();
+final Map> consumerStickyKeyHashesMap = 
localGroupedStickyKeyHashes.get();
+consumerStickyKeyHashesMap.clear();
 
 for (Entry entry : entries) {
-Consumer consumer = 
selector.select(peekStickyKey(entry.getDataBuffer()));
+byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+int stickyKeyHash = 
StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+
+Consumer consumer = selector.select(stickyKeyHash);
 if (consumer != null) {
-groupedEntries.computeIfAbsent(consumer, k -> new 
ArrayList<>()).add(entry);
+int startingSize = Math.max(10, entries.size() / (2 * 
consumerSet.size()));

Review Comment:
   What does this `startingSize` means? why do we need this ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-24 Thread via GitHub


poorbarcode commented on PR #21948:
URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2074593502

   @codelipenghui 
   
   > It looks like we can also fix the issue by involving a read lock to the 
addReplicationCluster() method in PersistentTopic.java without changing the 
close topic process. The replicator.disconnect() method will also call the 
managed cursor API to [change the cursor 
state](https://github.com/apache/pulsar/blob/8a18043585c49624e02b875b921336a81a9ec577/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L201)
 which doesn't make sense to close the managed ledger before closing the 
replicator
   
   Good suggestion, already improved the code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


lhotari commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577628530


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##
@@ -4321,4 +4321,44 @@ public void 
testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
 assertEquals(ml.currentLedgerEntries, 0);
 });
 }
+
+@Test
+public void testReverseFindPositionOneByOne() throws Exception {
+final int maxEntriesPerLedger = 5;
+
+ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", 
managedLedgerConfig);
+
+String matchEntry = "match-entry";
+String noMatchEntry = "nomatch-entry";
+Predicate predicate = entry -> {
+try {
+String entryValue = entry.getDataBuffer().toString(UTF_8);
+if (matchEntry.equals(entryValue)) {
+return true;
+}
+} finally {
+entry.release();
+}

Review Comment:
   it's better to move releasing to internalAsyncReverseFindPositionOneByOne ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)

2024-04-24 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 5a13f04aa1c [fix][io] CompressionEnabled didn't work on elasticsearch 
sink (#22565)
5a13f04aa1c is described below

commit 5a13f04aa1c1848c2919b3383cbc50b3262316d9
Author: Baodi Shi 
AuthorDate: Wed Apr 24 15:01:19 2024 +0800

[fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)

(cherry picked from commit a3cd1f8dd2c9f3fbc128f4ba6fb00f865b3a2316)
---
 .../elastic/ElasticSearchJavaRestClient.java   |  1 +
 .../opensearch/OpenSearchHighLevelRestClient.java  |  1 +
 .../io/elasticsearch/ElasticSearchClientTests.java | 34 ++
 3 files changed, 36 insertions(+)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 4749ea2e2d3..afda5ba0e74 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -84,6 +84,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
 
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
 .setConnectTimeout(config.getConnectTimeoutInMs())
 .setSocketTimeout(config.getSocketTimeoutInMs()))
+.setCompressionEnabled(config.isCompressionEnabled())
 .setHttpClientConfigCallback(this.configCallback)
 .setFailureListener(new 
org.elasticsearch.client.RestClient.FailureListener() {
 public void onFailure(Node node) {
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 7b704196702..bb92047f17a 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -112,6 +112,7 @@ public class OpenSearchHighLevelRestClient extends 
RestClient implements BulkPro
 
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
 .setConnectTimeout(config.getConnectTimeoutInMs())
 .setSocketTimeout(config.getSocketTimeoutInMs()))
+.setCompressionEnabled(config.isCompressionEnabled())
 .setHttpClientConfigCallback(this.configCallback)
 .setFailureListener(new 
org.opensearch.client.RestClient.FailureListener() {
 @Override
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index c1e0eafe03a..468d78d989c 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -30,8 +30,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -46,6 +48,8 @@ import 
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestCl
 import 
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
 import 
org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
 import org.awaitility.Awaitility;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestHighLevelClient;
 import org.testcontainers.containers.Network;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.AfterClass;
@@ -110,11 +114,41 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
 public void testClientInstance() throws Exception {
 try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSear

(pulsar) branch branch-3.0 updated: [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)

2024-04-24 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new e3bd58c48f6 [fix][io] CompressionEnabled didn't work on elasticsearch 
sink (#22565)
e3bd58c48f6 is described below

commit e3bd58c48f6cac0eb6305dce26354585de345cf0
Author: Baodi Shi 
AuthorDate: Wed Apr 24 15:01:19 2024 +0800

[fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)

(cherry picked from commit a3cd1f8dd2c9f3fbc128f4ba6fb00f865b3a2316)
---
 .../elastic/ElasticSearchJavaRestClient.java   |  1 +
 .../opensearch/OpenSearchHighLevelRestClient.java  |  1 +
 .../io/elasticsearch/ElasticSearchClientTests.java | 38 --
 3 files changed, 38 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 4749ea2e2d3..afda5ba0e74 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -84,6 +84,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
 
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
 .setConnectTimeout(config.getConnectTimeoutInMs())
 .setSocketTimeout(config.getSocketTimeoutInMs()))
+.setCompressionEnabled(config.isCompressionEnabled())
 .setHttpClientConfigCallback(this.configCallback)
 .setFailureListener(new 
org.elasticsearch.client.RestClient.FailureListener() {
 public void onFailure(Node node) {
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 7b704196702..bb92047f17a 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -112,6 +112,7 @@ public class OpenSearchHighLevelRestClient extends 
RestClient implements BulkPro
 
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
 .setConnectTimeout(config.getConnectTimeoutInMs())
 .setSocketTimeout(config.getSocketTimeoutInMs()))
+.setCompressionEnabled(config.isCompressionEnabled())
 .setHttpClientConfigCallback(this.configCallback)
 .setFailureListener(new 
org.opensearch.client.RestClient.FailureListener() {
 @Override
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 6d9928c0426..5e8347b708d 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.io.elasticsearch;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -27,8 +28,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -41,7 +44,8 @@ import 
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestCl
 import 
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
 import 
org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestHighLevelClient;

[PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd opened a new pull request, #22572:
URL: https://github.com/apache/pulsar/pull/22572

   ### Motivation
   
   #22571
   
   ### Analysis
   
   When enabling `replicateSubscriptionState` will use topic to sync 
subscription state, and make these message metadata as 
[Marker](https://github.com/apache/pulsar/blob/ac94296d2488b2b13d76fa2b1bfa71e9e3cfbb45/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java#L107-L120)
   
   These `marker` messages will not be sent to the consumer by the topic, and 
will automatically ack them.
   
   
https://github.com/apache/pulsar/blob/fc393f69043be6eb1b2572a27f131656a2cbc7f6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L202-L217
   
   But `getLastMessageId` will always return the last message position, 
regardless of whether the last message is `marked` or `not`. This will cause 
the reader stuck.
   
   ```java
   while (reader.hasMessageAvailable()) {  // get true
 Message message reader.readNext();  // never can't receive msg.
   }
   ```
   
   Your refer to this diagram to understand this bug:
   
   https://github.com/apache/pulsar/assets/33416836/62c5fdb2-9273-4b50-a527-0f649ffd8228";>
   
   
   ### Modifications
   - Add `asyncReverseFindPositionOneByOne` method on `ManagedLedger` .
   - Add `getLastCanDispatchPosition` method on `Topic`, it will call 
`asyncReverseFindPositionOneByOne` to find the last position of entry that not 
is `replistateSubscriptionState` 
   - Change the `getLastMessageId` implement of `ServerCnx` to use 
`getLastCanDispatchPosition` instead of `getMaxReadPosition`.
   
   
   
   ### Verifying this change
   - Add `ManagedLedgerTest.testReverseFindPositionOneByOne` to cover 
ReverseFindPositionOneByOne method.
   - Add `testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage` to cover 
this bug.
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

2024-04-24 Thread via GitHub


shibd opened a new issue, #22571:
URL: https://github.com/apache/pulsar/issues/22571

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   - master
   - 3.2.x
   - 3.1.x
   - 3.0.x.
   - 2.11.x
   
   ### Minimal reproduce step
   
   
   In geo-replication case. Let's say there are two clusters: `r1`, `r2`, will 
replicator topic:`my-topic` from `r1` and `r2`.
   
   The consumer1 subscribes to the topic `my-topic` of `r1` and enables 
`replicateSubscriptionState`.  After the subscription state sync to `r2`: 
`my-topic`, create a `reader` read the message from `r2`:`my-topic` will stuck 
on `readNext`.
   
   Please copy this test to `ReplicatorSubscriptionTest` to run it. 
   ```java
   @Test
   public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() 
throws Exception {
   String namespace = 
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
   String topicName = "persistent://" + namespace + "/mytopic";
   String subscriptionName = "cluster-subscription";
   // this setting can be used to manually run the test with 
subscription replication disabled
   // it shows that subscription replication has no impact in behavior 
for this test case
   boolean replicateSubscriptionState = true;
   
   admin1.namespaces().createNamespace(namespace);
   admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
   
   @Cleanup
   PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
   .statsInterval(0, TimeUnit.SECONDS)
   .build();
   
   // create subscription in r1
   createReplicatedSubscription(client1, topicName, subscriptionName, 
replicateSubscriptionState);
   
   @Cleanup
   PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString())
   .statsInterval(0, TimeUnit.SECONDS)
   .build();
   
   // create subscription in r2
   createReplicatedSubscription(client2, topicName, subscriptionName, 
replicateSubscriptionState);
   
   Set sentMessages = new LinkedHashSet<>();
   
   // send messages in r1
   @Cleanup
   Producer producer = client1.newProducer().topic(topicName)
   .enableBatching(false)
   .messageRoutingMode(MessageRoutingMode.SinglePartition)
   .create();
   int numMessages = 6;
   for (int i = 0; i < numMessages; i++) {
   String body = "message" + i;
   producer.send(body.getBytes(StandardCharsets.UTF_8));
   sentMessages.add(body);
   }
   producer.close();
   
   
   // consume 3 messages in r1
   Set receivedMessages = new LinkedHashSet<>();
   try (Consumer consumer1 = client1.newConsumer()
   .topic(topicName)
   .subscriptionName(subscriptionName)
   .replicateSubscriptionState(replicateSubscriptionState)
   .subscribe()) {
   readMessages(consumer1, receivedMessages, 3, false);
   }
   
   // wait for subscription to be replicated
   Thread.sleep(2 * 
config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
   
   // create a reader in r2
   Reader reader = client2.newReader().topic(topicName)
   .subscriptionName("new-sub")
   .startMessageId(MessageId.earliest)
   .create();
   int readNum = 0;
   while (reader.hasMessageAvailable()) {
   Message message = reader.readNext(10, TimeUnit.SECONDS);
   System.out.println("Receive message: " + new 
String(message.getValue()) + " msgId: " + message.getMessageId());
   assertNotNull(message);
   readNum++;
   }
   assertEquals(readNum, numMessages);
   }
   ```
   
   ### What did you expect to see?
   
   Test can passed. 
   
   
   ### What did you see instead?
   
   Test will stuck.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-dotpulsar) branch master updated: Update release_validation_linux_macos.md

2024-04-24 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 56828c7  Update release_validation_linux_macos.md
56828c7 is described below

commit 56828c72180dffaa9ec0f91907abdba44a4fbe81
Author: Lari Hotari 
AuthorDate: Wed Apr 24 02:00:43 2024 -0700

Update release_validation_linux_macos.md
---
 docs/release_validation_linux_macos.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/release_validation_linux_macos.md 
b/docs/release_validation_linux_macos.md
index 20269c9..4c0295e 100644
--- a/docs/release_validation_linux_macos.md
+++ b/docs/release_validation_linux_macos.md
@@ -48,7 +48,7 @@ gpg --verify-files *.asc
 
 ```shell
 tar zxvf pulsar-dotpulsar-${DOTPULSAR_VERSION}-src.tar.gz
-cd pulsar-dotpulsar-${DOTPULSAR_VERSION_RC}-src
+cd pulsar-dotpulsar-${DOTPULSAR_VERSION}-src
 dotnet build
 ```
 



Re: [D] Is there any way to delete a global topics without auto GC. [pulsar]

2024-04-24 Thread via GitHub


GitHub user shulaoh added a comment to the discussion: Is there any way to 
delete a global topics without auto GC.

finally got a workaroud.
for a topic-level gep-replication.
need to remove-replication-cluster from the topic before it gets deleted.

GitHub link: 
https://github.com/apache/pulsar/discussions/22481#discussioncomment-9210320


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-24 Thread via GitHub


poorbarcode commented on PR #21948:
URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2074263037

   Rebase master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Create partition topics in the remote clusters before set replication policies at the topic level [pulsar]

2024-04-24 Thread via GitHub


poorbarcode closed pull request #22203: [fix][broker] Create partition topics 
in the remote clusters before set replication policies at the topic level
URL: https://github.com/apache/pulsar/pull/22203


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Create partition topics in the remote clusters before set replication policies at the topic level [pulsar]

2024-04-24 Thread via GitHub


poorbarcode commented on PR #22203:
URL: https://github.com/apache/pulsar/pull/22203#issuecomment-2074225216

   Since https://github.com/apache/pulsar/pull/22537 has been merged, I close 
this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve] [broker] Create partitioned topics automatically when enable topic level replication (#22537)

2024-04-24 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d4756557bf4 [improve] [broker] Create partitioned topics automatically 
when enable topic level replication (#22537)
d4756557bf4 is described below

commit d4756557bf4328019dd938a56c3135aecc3147e4
Author: fengyubiao 
AuthorDate: Wed Apr 24 15:06:00 2024 +0800

[improve] [broker] Create partitioned topics automatically when enable 
topic level replication (#22537)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 104 +++--
 .../broker/admin/impl/PersistentTopicsBase.java|  23 -
 .../broker/service/OneWayReplicatorTest.java   |  87 -
 .../broker/service/OneWayReplicatorTestBase.java   |  31 +++---
 4 files changed, 196 insertions(+), 49 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index a1bfeb2142f..45455f16d4d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -43,9 +44,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
@@ -621,35 +624,82 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+   

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-24 Thread via GitHub


poorbarcode merged PR #22537:
URL: https://github.com/apache/pulsar/pull/22537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][io] CompressionEnabled didn't work on elasticsearch sink [pulsar]

2024-04-24 Thread via GitHub


nicoloboschi merged PR #22565:
URL: https://github.com/apache/pulsar/pull/22565


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)

2024-04-24 Thread nicoloboschi
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a3cd1f8dd2c [fix][io] CompressionEnabled didn't work on elasticsearch 
sink (#22565)
a3cd1f8dd2c is described below

commit a3cd1f8dd2c9f3fbc128f4ba6fb00f865b3a2316
Author: Baodi Shi 
AuthorDate: Wed Apr 24 15:01:19 2024 +0800

[fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)
---
 .../elastic/ElasticSearchJavaRestClient.java   |  1 +
 .../opensearch/OpenSearchHighLevelRestClient.java  |  1 +
 .../io/elasticsearch/ElasticSearchClientTests.java | 34 ++
 3 files changed, 36 insertions(+)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 4749ea2e2d3..afda5ba0e74 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -84,6 +84,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
 
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
 .setConnectTimeout(config.getConnectTimeoutInMs())
 .setSocketTimeout(config.getSocketTimeoutInMs()))
+.setCompressionEnabled(config.isCompressionEnabled())
 .setHttpClientConfigCallback(this.configCallback)
 .setFailureListener(new 
org.elasticsearch.client.RestClient.FailureListener() {
 public void onFailure(Node node) {
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index 7b704196702..bb92047f17a 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -112,6 +112,7 @@ public class OpenSearchHighLevelRestClient extends 
RestClient implements BulkPro
 
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
 .setConnectTimeout(config.getConnectTimeoutInMs())
 .setSocketTimeout(config.getSocketTimeoutInMs()))
+.setCompressionEnabled(config.isCompressionEnabled())
 .setHttpClientConfigCallback(this.configCallback)
 .setFailureListener(new 
org.opensearch.client.RestClient.FailureListener() {
 @Override
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index c1e0eafe03a..468d78d989c 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -30,8 +30,10 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
@@ -46,6 +48,8 @@ import 
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestCl
 import 
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
 import 
org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
 import org.awaitility.Awaitility;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestHighLevelClient;
 import org.testcontainers.containers.Network;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.AfterClass;
@@ -110,11 +114,41 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
 public void testClientInstance() throws Exception {
 try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
 .setElasticSearchUrl("http://"; + 
container.getHtt