Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
thetumbled commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578960282 ## pip/pip-347.md: ## @@ -0,0 +1,68 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +- In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. +- When users want to create a subscription in the management platform, we will grant the `consume` permission of topic and the permission of +that subscription to the user's token. so that user can create a subscription successfully. +- We also set config `allowAutoSubscriptionCreation` to be true to allow user creating subscriptions automatically if user's token has the `consume` permission. +But, **auto-creation of subscriptions will not grant the permission of the subscription to the user's token**. +- The permission control flow of subscription: + - we can only grant and revoke the permission of a subscription by the `grant-permission` and `revoke-permission` REST API. + - The permission of subscription is in the namespace level. When a user creates a subscription `sub` for topic `persistent://public/default/test`, + broker will check if user's token has the `consume` permission of `persistent://public/default/test`, if yes, broker will retrieve the permission of all subscriptions under the namespace `public/default` and get the role list of subscription `sub`. +- **If the role list is empty or null, broker allow to create the subscription.** However, the permission of the subscription is still not granted to the user's token. +- If the role list is not empty, broker will check if the role corresponding to the user's token is in the role list. If yes, broker allow to create the subscription. +- If the role corresponding to the user's token is not in the role list, broker will reject the request. + +There is a permission hole in the current design. If a user obtain the `consume` permission of a topic in management platform, he can create infinite subscriptions without notifying the administrator. +For example, user `jack` can obtain the `consume` permission of topic `persistent://public/default/test` and the permission of subscription `sub` in management platform. +At this time, the role list under the namespace `public/default` is +``` +'sub' -> ['jack'] +``` +Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic `persistent://public/default/test` without notifying the administrator. The role list under the namespace `public/default` remain unchanged as +``` +'sub' -> ['jack'] +``` +The administrator can't know who is the owner of the subscriptions `sub1`, `sub2`,... + +What is worse, if another user `tom` obtain the permission of subscription `sub1` under the same namespace `public/default` in management platform, that result into the role list under the namespace `public/default` become to +``` +'sub' -> ['jack'] +'sub1' -> ['tom'] +``` +`jack` lose the permission of subscription `sub1` immediately. Because jack do not have the permission of subscription `sub1` really. + +He just uses an unregistered subscription `sub1` owned by nobody to consume the messages. Once there are any people obtain the permission of subscription `sub1`, his task consuming `sub1` will be shutdown immediately. + +# Motivation + +Such kind of permission lost problem cause us lots of problem. We need a tool to find out the owner of these unregistered subscriptions first. + +# Goals +Add a field `role` in the consumer's stat to show the owner of this consumer. + +## In Scope + +- help administrator to find out the owner of the consumer + +## Out of Scope + +Maybe we improve the permission control of subscriptions in the future to fix the permission lost problem described above. + + +# Backward & Forward Compatibility + +Fully compatible. Review Comment: https://github.com/apache/pulsar/pull/22579 is a break change, existing tasks may shutdown due to this pr. So we don't enhance the permission control with this pr now, but try to find out all these uncontrolled subscription first, and grant the permission to them at least. I agree with your opinion that we should handle the existing clusters carefully. I hope that we can have a more friendly solution instead of this break change. https://github.com/apache/pulsar/pull/22579 is a candidate solutions. Finally, i think that the community should fix this problem in case of more and more users facing such kind of problem. ## pip/pip-347.md: ## @@ -0,0 +1,68 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +- In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. +- When users want to create a subscription in the management platform, we will grant the `consume` permiss
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
poorbarcode commented on PR #22577: URL: https://github.com/apache/pulsar/pull/22577#issuecomment-2076500256 Rebase master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578970618 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3602,6 +3603,15 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } +@Override +public CompletableFuture getLastCanDispatchPosition() { Review Comment: For TransactionBuffer, it just handle the TXN commit/abort/append requests and provide the guarantee of READ_COMMITTED. Determine the position to dispatch is Topic's duty, TransactionBuffer should not be aware of it. See also https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4122-L4124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578970618 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3602,6 +3603,15 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } +@Override +public CompletableFuture getLastCanDispatchPosition() { Review Comment: For TransactionBuffer, it just handle the TXN commit/abort/append requests and provide the guarantee of READ_COMMIT. Determine the position to dispatch is Topic's duty, TransactionBuffer should not be aware of it. See also https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4122-L4124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
thetumbled commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578960282 ## pip/pip-347.md: ## @@ -0,0 +1,68 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +- In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. +- When users want to create a subscription in the management platform, we will grant the `consume` permission of topic and the permission of +that subscription to the user's token. so that user can create a subscription successfully. +- We also set config `allowAutoSubscriptionCreation` to be true to allow user creating subscriptions automatically if user's token has the `consume` permission. +But, **auto-creation of subscriptions will not grant the permission of the subscription to the user's token**. +- The permission control flow of subscription: + - we can only grant and revoke the permission of a subscription by the `grant-permission` and `revoke-permission` REST API. + - The permission of subscription is in the namespace level. When a user creates a subscription `sub` for topic `persistent://public/default/test`, + broker will check if user's token has the `consume` permission of `persistent://public/default/test`, if yes, broker will retrieve the permission of all subscriptions under the namespace `public/default` and get the role list of subscription `sub`. +- **If the role list is empty or null, broker allow to create the subscription.** However, the permission of the subscription is still not granted to the user's token. +- If the role list is not empty, broker will check if the role corresponding to the user's token is in the role list. If yes, broker allow to create the subscription. +- If the role corresponding to the user's token is not in the role list, broker will reject the request. + +There is a permission hole in the current design. If a user obtain the `consume` permission of a topic in management platform, he can create infinite subscriptions without notifying the administrator. +For example, user `jack` can obtain the `consume` permission of topic `persistent://public/default/test` and the permission of subscription `sub` in management platform. +At this time, the role list under the namespace `public/default` is +``` +'sub' -> ['jack'] +``` +Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic `persistent://public/default/test` without notifying the administrator. The role list under the namespace `public/default` remain unchanged as +``` +'sub' -> ['jack'] +``` +The administrator can't know who is the owner of the subscriptions `sub1`, `sub2`,... + +What is worse, if another user `tom` obtain the permission of subscription `sub1` under the same namespace `public/default` in management platform, that result into the role list under the namespace `public/default` become to +``` +'sub' -> ['jack'] +'sub1' -> ['tom'] +``` +`jack` lose the permission of subscription `sub1` immediately. Because jack do not have the permission of subscription `sub1` really. + +He just uses an unregistered subscription `sub1` owned by nobody to consume the messages. Once there are any people obtain the permission of subscription `sub1`, his task consuming `sub1` will be shutdown immediately. + +# Motivation + +Such kind of permission lost problem cause us lots of problem. We need a tool to find out the owner of these unregistered subscriptions first. + +# Goals +Add a field `role` in the consumer's stat to show the owner of this consumer. + +## In Scope + +- help administrator to find out the owner of the consumer + +## Out of Scope + +Maybe we improve the permission control of subscriptions in the future to fix the permission lost problem described above. + + +# Backward & Forward Compatibility + +Fully compatible. Review Comment: PR https://github.com/apache/pulsar/pull/22579 is a break change, existing tasks may shutdown due to this pr. So we don't enhance the permission control with this pr now, but try to find out all these uncontrolled subscription first, and grant the permission to them at least. In my opinion, the community should fix this problem in case of more and more users facing such kind of problem. But for existing clusters, we should handle them carefully. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
thetumbled commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578947195 ## pip/pip-347.md: ## @@ -0,0 +1,68 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +- In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. +- When users want to create a subscription in the management platform, we will grant the `consume` permission of topic and the permission of +that subscription to the user's token. so that user can create a subscription successfully. +- We also set config `allowAutoSubscriptionCreation` to be true to allow user creating subscriptions automatically if user's token has the `consume` permission. +But, **auto-creation of subscriptions will not grant the permission of the subscription to the user's token**. +- The permission control flow of subscription: + - we can only grant and revoke the permission of a subscription by the `grant-permission` and `revoke-permission` REST API. + - The permission of subscription is in the namespace level. When a user creates a subscription `sub` for topic `persistent://public/default/test`, + broker will check if user's token has the `consume` permission of `persistent://public/default/test`, if yes, broker will retrieve the permission of all subscriptions under the namespace `public/default` and get the role list of subscription `sub`. +- **If the role list is empty or null, broker allow to create the subscription.** However, the permission of the subscription is still not granted to the user's token. +- If the role list is not empty, broker will check if the role corresponding to the user's token is in the role list. If yes, broker allow to create the subscription. +- If the role corresponding to the user's token is not in the role list, broker will reject the request. + +There is a permission hole in the current design. If a user obtain the `consume` permission of a topic in management platform, he can create infinite subscriptions without notifying the administrator. +For example, user `jack` can obtain the `consume` permission of topic `persistent://public/default/test` and the permission of subscription `sub` in management platform. +At this time, the role list under the namespace `public/default` is +``` +'sub' -> ['jack'] +``` +Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic `persistent://public/default/test` without notifying the administrator. The role list under the namespace `public/default` remain unchanged as +``` +'sub' -> ['jack'] +``` +The administrator can't know who is the owner of the subscriptions `sub1`, `sub2`,... + +What is worse, if another user `tom` obtain the permission of subscription `sub1` under the same namespace `public/default` in management platform, that result into the role list under the namespace `public/default` become to +``` +'sub' -> ['jack'] +'sub1' -> ['tom'] +``` +`jack` lose the permission of subscription `sub1` immediately. Because jack do not have the permission of subscription `sub1` really. + +He just uses an unregistered subscription `sub1` owned by nobody to consume the messages. Once there are any people obtain the permission of subscription `sub1`, his task consuming `sub1` will be shutdown immediately. + +# Motivation + +Such kind of permission lost problem cause us lots of problem. We need a tool to find out the owner of these unregistered subscriptions first. + +# Goals +Add a field `role` in the consumer's stat to show the owner of this consumer. + +## In Scope + +- help administrator to find out the owner of the consumer + +## Out of Scope + +Maybe we improve the permission control of subscriptions in the future to fix the permission lost problem described above. + + +# Backward & Forward Compatibility + +Fully compatible. Review Comment: No, this pip is to provide a method to find out the owner of these uncontrolled subscription. The implementation for this pr is https://github.com/apache/pulsar/pull/22562. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed [pulsar]
dao-jun commented on code in PR #22580: URL: https://github.com/apache/pulsar/pull/22580#discussion_r1578943731 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -1254,7 +1254,8 @@ private CompletableFuture> createNonPersistentTopic(String topic nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); -return FutureUtil.failedFuture(e); +topicFuture.completeExceptionally(e); Review Comment: Makes sense to me, I also tried to fix the flaky test, and during the process, I find that I didn't handle this case. But the test keeps failing after fixed, so the issue never get fixed... I should create another PR to fix here first... https://github.com/apache/pulsar/pull/22256/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8c704c6ed6e317f5R1254 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed [pulsar]
dao-jun commented on code in PR #22580: URL: https://github.com/apache/pulsar/pull/22580#discussion_r1578943731 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -1254,7 +1254,8 @@ private CompletableFuture> createNonPersistentTopic(String topic nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); -return FutureUtil.failedFuture(e); +topicFuture.completeExceptionally(e); Review Comment: Makes sense to me, I also tried to fix the flaky test, and during the process, I find that I didn't handle this case. But the test keeps failing after fixed, so the issue never get fixed https://github.com/apache/pulsar/pull/22256/files#diff-0210356c8a88e4efa89eb769a027fa6c166db479dbad8c704c6ed6e317f5R1254 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]
BewareMyPower commented on PR #22573: URL: https://github.com/apache/pulsar/pull/22573#issuecomment-2076450786 > This was added to wait for some time after bundles are unloaded, but I don't think it is necessary. Agreed. We can remove it in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
michaeljmarshall commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578873732 ## pip/pip-347.md: ## @@ -0,0 +1,68 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +- In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. +- When users want to create a subscription in the management platform, we will grant the `consume` permission of topic and the permission of +that subscription to the user's token. so that user can create a subscription successfully. +- We also set config `allowAutoSubscriptionCreation` to be true to allow user creating subscriptions automatically if user's token has the `consume` permission. +But, **auto-creation of subscriptions will not grant the permission of the subscription to the user's token**. +- The permission control flow of subscription: + - we can only grant and revoke the permission of a subscription by the `grant-permission` and `revoke-permission` REST API. + - The permission of subscription is in the namespace level. When a user creates a subscription `sub` for topic `persistent://public/default/test`, + broker will check if user's token has the `consume` permission of `persistent://public/default/test`, if yes, broker will retrieve the permission of all subscriptions under the namespace `public/default` and get the role list of subscription `sub`. +- **If the role list is empty or null, broker allow to create the subscription.** However, the permission of the subscription is still not granted to the user's token. +- If the role list is not empty, broker will check if the role corresponding to the user's token is in the role list. If yes, broker allow to create the subscription. +- If the role corresponding to the user's token is not in the role list, broker will reject the request. + +There is a permission hole in the current design. If a user obtain the `consume` permission of a topic in management platform, he can create infinite subscriptions without notifying the administrator. +For example, user `jack` can obtain the `consume` permission of topic `persistent://public/default/test` and the permission of subscription `sub` in management platform. +At this time, the role list under the namespace `public/default` is +``` +'sub' -> ['jack'] +``` +Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic `persistent://public/default/test` without notifying the administrator. The role list under the namespace `public/default` remain unchanged as +``` +'sub' -> ['jack'] +``` +The administrator can't know who is the owner of the subscriptions `sub1`, `sub2`,... + +What is worse, if another user `tom` obtain the permission of subscription `sub1` under the same namespace `public/default` in management platform, that result into the role list under the namespace `public/default` become to +``` +'sub' -> ['jack'] +'sub1' -> ['tom'] +``` +`jack` lose the permission of subscription `sub1` immediately. Because jack do not have the permission of subscription `sub1` really. + +He just uses an unregistered subscription `sub1` owned by nobody to consume the messages. Once there are any people obtain the permission of subscription `sub1`, his task consuming `sub1` will be shutdown immediately. + +# Motivation + +Such kind of permission lost problem cause us lots of problem. We need a tool to find out the owner of these unregistered subscriptions first. + +# Goals +Add a field `role` in the consumer's stat to show the owner of this consumer. + +## In Scope + +- help administrator to find out the owner of the consumer + +## Out of Scope + +Maybe we improve the permission control of subscriptions in the future to fix the permission lost problem described above. + + +# Backward & Forward Compatibility + +Fully compatible. Review Comment: Assuming that https://github.com/apache/pulsar/pull/22579 is the desired implementation, that will break any installations that do not already grant permission for each role to each subscription on the namespace level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix permission hole for subscription. [pulsar]
Technoboy- closed pull request #22579: [fix] [broker] fix permission hole for subscription. URL: https://github.com/apache/pulsar/pull/22579 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix] [broker] [fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when load non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed [pulsar]
poorbarcode opened a new pull request, #22580: URL: https://github.com/apache/pulsar/pull/22580 ### Motivation 1. The test `BrokerServiceTest.testBrokerStatsTopicLoadFailed` always fails, so https://github.com/apache/pulsar/pull/20494 disabled it. 2. There is a case that `pulsar_topic_load_failed_count` does not increase when load non-persistent topic fails[1]. **[1]**: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L1239-L1258 ```java CompletableFuture> topicFuture = new CompletableFuture<>(); topicFuture.exceptionally(t -> { pulsarStats.recordTopicLoadFailed(); return null; }); try { nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); // Highlight: It is a new future, so the event `topicFuture.exceptionally` will not receive the error callback anymore. return FutureUtil.failedFuture(e); } ``` ### Modifications 1. fix the test. 2. fix the issue ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578834981 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3602,6 +3603,15 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } +@Override +public CompletableFuture getLastCanDispatchPosition() { Review Comment: About the simple method comment, I know your intention. After these change, there will be fewer methods. But this logic will runs into the `PersistentTopic`. ```java PositionImpl tnxMaxReadPosition = transactionBuffer.getMaxReadPosition(); if (getLastPosition() == tnxMaxReadPosition) { return topic.getLastCanDispatchPosition(); } else { return CompletableFuture.completedFuture(tnxMaxReadPosition); } ``` I still want it to be cohesive in class `TransactionBuffer`. I think makes sense to expose the `getLastCanDispatchPosition` method in `TransactionBuffer` and `Topic`, it can clearly remind the caller: **The position returned by the `getLastPosition` method is not dispatchable, should use `getLastCanDispatchPosition`** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][pip] PIP-347: add role field in consumer's stat [pulsar]
michaeljmarshall commented on code in PR #22564: URL: https://github.com/apache/pulsar/pull/22564#discussion_r1578835168 ## pip/pip-347.md: ## @@ -0,0 +1,68 @@ + +# PIP-347: add role field in consumer's stat + +# Background knowledge + +- In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. +- When users want to create a subscription in the management platform, we will grant the `consume` permission of topic and the permission of +that subscription to the user's token. so that user can create a subscription successfully. +- We also set config `allowAutoSubscriptionCreation` to be true to allow user creating subscriptions automatically if user's token has the `consume` permission. +But, **auto-creation of subscriptions will not grant the permission of the subscription to the user's token**. +- The permission control flow of subscription: + - we can only grant and revoke the permission of a subscription by the `grant-permission` and `revoke-permission` REST API. + - The permission of subscription is in the namespace level. When a user creates a subscription `sub` for topic `persistent://public/default/test`, + broker will check if user's token has the `consume` permission of `persistent://public/default/test`, if yes, broker will retrieve the permission of all subscriptions under the namespace `public/default` and get the role list of subscription `sub`. +- **If the role list is empty or null, broker allow to create the subscription.** However, the permission of the subscription is still not granted to the user's token. +- If the role list is not empty, broker will check if the role corresponding to the user's token is in the role list. If yes, broker allow to create the subscription. +- If the role corresponding to the user's token is not in the role list, broker will reject the request. + +There is a permission hole in the current design. If a user obtain the `consume` permission of a topic in management platform, he can create infinite subscriptions without notifying the administrator. +For example, user `jack` can obtain the `consume` permission of topic `persistent://public/default/test` and the permission of subscription `sub` in management platform. +At this time, the role list under the namespace `public/default` is +``` +'sub' -> ['jack'] +``` +Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic `persistent://public/default/test` without notifying the administrator. The role list under the namespace `public/default` remain unchanged as +``` +'sub' -> ['jack'] +``` +The administrator can't know who is the owner of the subscriptions `sub1`, `sub2`,... + +What is worse, if another user `tom` obtain the permission of subscription `sub1` under the same namespace `public/default` in management platform, that result into the role list under the namespace `public/default` become to +``` +'sub' -> ['jack'] +'sub1' -> ['tom'] +``` +`jack` lose the permission of subscription `sub1` immediately. Because jack do not have the permission of subscription `sub1` really. + +He just uses an unregistered subscription `sub1` owned by nobody to consume the messages. Once there are any people obtain the permission of subscription `sub1`, his task consuming `sub1` will be shutdown immediately. + +# Motivation + +Such kind of permission lost problem cause us lots of problem. We need a tool to find out the owner of these unregistered subscriptions first. + +# Goals +Add a field `role` in the consumer's stat to show the owner of this consumer. + +## In Scope + +- help administrator to find out the owner of the consumer + +## Out of Scope + +Maybe we improve the permission control of subscriptions in the future to fix the permission lost problem described above. + + +# Backward & Forward Compatibility + +Fully compatible. Review Comment: If I understand correctly, this is not backwards compatible. It has been discussed previously. I'll follow up in the next 24 hours with references and additional information. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578835194 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -4082,6 +4092,10 @@ public PositionImpl getMaxReadPosition() { return this.transactionBuffer.getMaxReadPosition(); } +public CompletableFuture getLastCanDispatchPositionWithTxn() { Review Comment: Make sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578834981 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3602,6 +3603,15 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } +@Override +public CompletableFuture getLastCanDispatchPosition() { Review Comment: About the simple method comment, I know your intention. After these change, there will be fewer methods. But this logic runs into the `persistentTopic`. ```java PositionImpl tnxMaxReadPosition = transactionBuffer.getMaxReadPosition(); if (getLastPosition() == tnxMaxReadPosition) { return topic.getLastCanDispatchPosition(); } else { return CompletableFuture.completedFuture(tnxMaxReadPosition); } ``` I still want it to be cohesive in class `TransactionBuffer`. I think makes sense to expose the `getLastCanDispatchPosition` method in `TransactionBuffer` and `Topic`, it can clearly remind the caller: **The position returned by the `getLastPosition` method is not dispatchable, should use `getLastCanDispatchPosition`** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Avoid being stuck when closing the broker with extensible load manager [pulsar]
heesung-sn commented on PR #22573: URL: https://github.com/apache/pulsar/pull/22573#issuecomment-2076338059 OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS I think we can actually remove this. This was added to wait for some time after bundles are unloaded, but I don't think it is necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix permission hole for subscription. [pulsar]
Technoboy- commented on PR #22579: URL: https://github.com/apache/pulsar/pull/22579#issuecomment-2076336747 Could you help add a test for this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578822283 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -509,6 +509,16 @@ public PositionImpl getMaxReadPosition() { } } +@Override +public CompletableFuture getLastCanDispatchPosition() { +PositionImpl tnxMaxReadPosition = getMaxReadPosition(); +if (topic.getLastPosition() == tnxMaxReadPosition) { Review Comment: I know `tnxMaxReadPosition` and `topic.getLastPosition()` are the same object if they are equals to, but for the compare about two `PositionImpl` objects, `topic.getLastPosition.compareTo(tnxMaxReadPosition) == 0` is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578818795 ## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java: ## @@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional return future; } +@Override +public CompletableFuture asyncReverseFindPositionOneByOne(Predicate predicate) { Review Comment: I want to show two key points in the method name: `Reverse` and `OneByOne` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
nodece commented on PR #22577: URL: https://github.com/apache/pulsar/pull/22577#issuecomment-2076306671 I think we intentionally overlooked the result of creating a topic for the remote cluster. This PR may break the user's behavior of creating topic. According to your description, when using the default broker configuration, and the geo-replication is enabled on the namespace level, the remote cluster will create two non-partitioned topics, tenant/namespace/topic-partition-1 and tenant/namespace/topic-partition-2 by the geo producer. Is it right? If right, I would suggest adding a topic check before starting the replicator to make sure that the topic is the same between local and remote clusters, if they are the same, start the replicator, otherwise throw a log. You can also add this check in the replicator task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] PIP-347 Extend LTS release process to client SDKs [pulsar]
thetumbled commented on PR #22578: URL: https://github.com/apache/pulsar/pull/22578#issuecomment-2076306613 Hi, this PIP is confict with https://github.com/apache/pulsar/pull/22564. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809582 ## managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java: ## @@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional return future; } +@Override +public CompletableFuture asyncReverseFindPositionOneByOne(Predicate predicate) { Review Comment: Maybe `asyncFindLastValidPosition` or something else is better ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java: ## @@ -160,6 +160,12 @@ public interface TransactionBuffer { */ PositionImpl getMaxReadPosition(); +/** + * Get the can dispatch max position. + * @return the stable position. + */ +CompletableFuture getLastCanDispatchPosition(); Review Comment: Maybe we don't need this method in TransactionBuffer, and move `TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to `PersistentTopic#getLastCanDispatchPositionWithTxn()` is better. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -3602,6 +3603,15 @@ public Position getLastPosition() { return ledger.getLastConfirmedEntry(); } +@Override +public CompletableFuture getLastCanDispatchPosition() { Review Comment: Maybe we can make the method private or move the logics to `getLastCanDispatchPositionWithTxn()`, just expose `getLastCanDispatchPositionWithTxn()`. ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java: ## @@ -4082,6 +4092,10 @@ public PositionImpl getMaxReadPosition() { return this.transactionBuffer.getMaxReadPosition(); } +public CompletableFuture getLastCanDispatchPositionWithTxn() { Review Comment: Maybe we can rename the method to `getLastDispatchablePositionWithTxn` or something else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix permission hole for subscription. [pulsar]
thetumbled commented on PR #22579: URL: https://github.com/apache/pulsar/pull/22579#issuecomment-2076301940 PTAL, thanks. @tisonkun @michaeljmarshall @codelipenghui @BewareMyPower @jiazhai @lhotari -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix] [broker] fix permission hole for subscription. [pulsar]
thetumbled opened a new pull request, #22579: URL: https://github.com/apache/pulsar/pull/22579 ### Motivation Motivated by the permission hole described in https://github.com/apache/pulsar/pull/22564. - In a typical Pulsar deployment, we set `authenticationEnabled=true` and `authorizationEnabled=true` to enable authentication and authorization. - When users want to create a subscription in the management platform, we will grant the `consume` permission of topic and the permission of that subscription to the user's token. so that user can create a subscription successfully. - We also set config `allowAutoSubscriptionCreation` to be true to allow user creating subscriptions automatically if user's token has the `consume` permission. But, **auto-creation of subscriptions will not grant the permission of the subscription to the user's token**. - The permission control flow of subscription: - we can only grant and revoke the permission of a subscription by the `grant-permission` and `revoke-permission` REST API. - The permission of subscription is in the namespace level. When a user creates a subscription `sub` for topic `persistent://public/default/test`, broker will check if user's token has the `consume` permission of `persistent://public/default/test`, if yes, broker will retrieve the permission of all subscriptions under the namespace `public/default` and get the role list of subscription `sub`. - **If the role list is empty or null, broker allow to create the subscription.** However, the permission of the subscription is still not granted to the user's token. - If the role list is not empty, broker will check if the role corresponding to the user's token is in the role list. If yes, broker allow to create the subscription. - If the role corresponding to the user's token is not in the role list, broker will reject the request. There is a permission hole in the current design. If a user obtain the `consume` permission of a topic in management platform, he can create infinite subscriptions without notifying the administrator. For example, user `jack` can obtain the `consume` permission of topic `persistent://public/default/test` and the permission of subscription `sub` in management platform. At this time, the role list under the namespace `public/default` is ``` 'sub' -> ['jack'] ``` Then, user `jack` can create other subscriptions `sub1`, `sub2`,... for topic `persistent://public/default/test` without notifying the administrator. The role list under the namespace `public/default` remain unchanged as ``` 'sub' -> ['jack'] ``` The administrator can't know who is the owner of the subscriptions `sub1`, `sub2`,... What is worse, if another user `tom` obtain the permission of subscription `sub1` under the same namespace `public/default` in management platform, that result into the role list under the namespace `public/default` become to ``` 'sub' -> ['jack'] 'sub1' -> ['tom'] ``` `jack` lose the permission of subscription `sub1` immediately. Because jack do not have the permission of subscription `sub1` really. He just uses an unregistered subscription `sub1` owned by nobody to consume the messages. Once there are any people obtain the permission of subscription `sub1`, his task consuming `sub1` will be shutdown immediately. ### Modifications Do not allow to subscribe if the role do not have the permission really. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809479 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java: ## @@ -160,6 +160,12 @@ public interface TransactionBuffer { */ PositionImpl getMaxReadPosition(); +/** + * Get the can dispatch max position. + * @return the stable position. + */ +CompletableFuture getLastCanDispatchPosition(); Review Comment: Maybe we don't need this method, and move `TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to `PersistentTopic#getLastCanDispatchPositionWithTxn()` is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809479 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java: ## @@ -160,6 +160,12 @@ public interface TransactionBuffer { */ PositionImpl getMaxReadPosition(); +/** + * Get the can dispatch max position. + * @return the stable position. + */ +CompletableFuture getLastCanDispatchPosition(); Review Comment: Maybe we don't need this method, and move `TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to `PersistentTopic#getLastCanDispatchPositionWithTxn()` is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578778014 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() { } } +@Override +public CompletableFuture getLastCanDispatchPosition() { +return CompletableFuture.completedFuture(getMaxReadPosition()); Review Comment: Make sense to me, I add this logic and add `ReplicatorSubscriptionWithTransactionTest` to cover treansaction enable case. Thanks for your reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578776447 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ## @@ -4321,4 +4321,44 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) assertEquals(ml.currentLedgerEntries, 0); }); } + +@Test +public void testReverseFindPositionOneByOne() throws Exception { +final int maxEntriesPerLedger = 5; + +ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); +managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger); +ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig); + +String matchEntry = "match-entry"; +String noMatchEntry = "nomatch-entry"; +Predicate predicate = entry -> { +try { +String entryValue = entry.getDataBuffer().toString(UTF_8); +if (matchEntry.equals(entryValue)) { +return true; +} +} finally { +entry.release(); +} Review Comment: Make sense, moved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] NumberFormatException when consuming messages from pulsar-admin CLI [pulsar]
visortelle commented on issue #22405: URL: https://github.com/apache/pulsar/issues/22405#issuecomment-2075814941 > Unfortunately, nobody ran extensive tests with the docker images during the release candidate phase Would having nightly images help to catch more issues before an actual release? https://github.com/apache/pulsar/issues/14755 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578390902 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() { } } +@Override +public CompletableFuture getLastCanDispatchPosition() { +return CompletableFuture.completedFuture(getMaxReadPosition()); Review Comment: I mean if we want to fix #22571 by adding `getLastCanDispatchPosition` here, we also need to consider the `maxReadPosition` equals to `lastConfirmedEntry` and it could be a Marker. Before ```java return CompletableFuture.completedFuture(getMaxReadPosition()); ``` verify the `maxReadPosition` is valid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][site] PIP-264: Document topic messaging metrics [pulsar-site]
dragosvictor commented on code in PR #880: URL: https://github.com/apache/pulsar-site/pull/880#discussion_r1578304337 ## docs/reference-metrics-opentelemetry.md: ## @@ -8,6 +8,153 @@ Pulsar exposes the following OpenTelemetry metrics. ## Broker +### Topic Messaging metrics + + pulsar.broker.topic.subscription.count +The number of Pulsar subscriptions of the topic served by this broker. +* Type: UpDownCounter +* Unit: `{subscription}` + + pulsar.broker.topic.producer.count +The number of active producers of the topic connected to this broker. +* Type: UpDownCounter +* Unit: `{producer}` + + pulsar.broker.topic.consumer.count +The number of active consumers of the topic connected to this broker. +* Type: UpDownCounter +* Unit: `{consumer}` + + pulsar.broker.topic.message.incoming.count +The total number of messages received for this topic. +* Type: Counter +* Unit: `{message}` + + pulsar.broker.topic.message.outgoing.count +The total number of messages read from this topic. +* Type: Counter +* Unit: `{message}` + + pulsar.broker.topic.message.incoming.size +The total number of messages bytes received for this topic. +* Type: Counter +* Unit: `{byte}` + + pulsar.broker.topic.message.outgoing.size +The total number of messages bytes read from this topic. +* Type: Counter +* Unit: `{byte}` + + pulsar.broker.topic.publish.rate.limit +The number of times the publish rate limit is triggered. +* Type: Counter +* Unit: `{event}` + + pulsar.broker.topic.consumer.msg.ack +The total number of message acknowledgments received for this topic. +* Type: Counter +* Unit: `{ack}` + + pulsar.broker.topic.storage.size +The total storage size of the messages in this topic. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.logical.size +The storage size of topics in the namespace owned by the broker without replicas. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.backlog.size +The total backlog size of the topics of this topic owned by this broker. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.offloaded.size +The total amount of the data in this topic offloaded to the tiered storage. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.backlog.quota.limit.size +The size based backlog quota limit for this topic. +* Type: Gauge +* Unit: `{byte}` + + pulsar.broker.topic.storage.backlog.quota.limit.time +The time based backlog quota limit for this topic. +* Type: Gauge +* Unit: `{second}` + + pulsar.broker.topic.storage.backlog.quota.eviction.count +The number of times a backlog was evicted since it has exceeded its quota. +* Type: Counter +* Unit: `{eviction}` + + pulsar.broker.topic.storage.backlog.age +The age of the oldest unacknowledged message (backlog). +* Type: Gauge +* Unit: `{second}` + + pulsar.broker.topic.storage.outgoing +The total message batches (entries) written to the storage for this topic. +* Type: UpDownCounter +* Unit: `{message batch}` + + pulsar.broker.topic.storage.incoming +The total message batches (entries) read from the storage for this topic. +* Type: UpDownCounter +* Unit: `{message batch}` + + pulsar.broker.topic.compaction.removed.event.count +The total number of removed events of the compaction. +* Type: UpDownCounter +* Unit: `{event}` + + pulsar.broker.topic.compaction.succeed.count Review Comment: Refactored, as described [below](https://github.com/apache/pulsar-site/pull/880/files/8986d386d0f76d6a14e3cb709c801a053219a764#r1578260459). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b774666331d [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948) b774666331d is described below commit b774666331db33ea6407174e0fe6e27a73160522 Author: fengyubiao AuthorDate: Thu Apr 25 01:45:41 2024 +0800 [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948) --- .../pulsar/broker/service/AbstractReplicator.java | 10 +- .../apache/pulsar/broker/service/Replicator.java | 2 + .../service/persistent/PersistentReplicator.java | 9 +- .../broker/service/persistent/PersistentTopic.java | 58 +-- .../broker/service/OneWayReplicatorTest.java | 166 + .../broker/service/OneWayReplicatorTestBase.java | 14 +- .../pulsar/broker/service/ReplicatorTest.java | 2 +- 7 files changed, 239 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index f34144deb0a..394fad21ae6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -248,7 +248,7 @@ public abstract class AbstractReplicator implements Replicator { } startProducer(); }).exceptionally(ex -> { -log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" +log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + " trigger a terminate. Replicator state: {}", localTopicName, replicatorId, STATE_UPDATER.get(this), ex); terminate(); @@ -377,9 +377,13 @@ public abstract class AbstractReplicator implements Replicator { this.producer = null; // set the cursor as inactive. disableReplicatorRead(); +// release resources. +doReleaseResources(); }); } +protected void doReleaseResources() {} + protected boolean tryChangeStatusToTerminating() { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){ return true; @@ -468,4 +472,8 @@ public abstract class AbstractReplicator implements Replicator { } return compareSetAndGetState(expect, update); } + +public boolean isTerminated() { +return state == State.Terminating || state == State.Terminated; +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 8130b855b4e..5c314397da8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -51,4 +51,6 @@ public interface Replicator { boolean isConnected(); long getNumberOfEntriesInBacklog(); + +boolean isTerminated(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 5e1cc4a936a..367d1965207 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -450,7 +450,7 @@ public abstract class PersistentReplicator extends AbstractReplicator long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof CursorAlreadyClosedException) { -log.error("[{}] Error reading entries because replicator is" +log.warn("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be disconnected. @@ -570,7 +570,7 @@ public abstract class PersistentReplicator extends AbstractReplicator log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx, exception.getMessage(), exception); if (exception instanceof CursorAlreadyClosedException) { -log.error("[{}] Asynchronous ack failure because replicator is already
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode merged PR #21948: URL: https://github.com/apache/pulsar/pull/21948 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
codecov-commenter commented on PR #21948: URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2075491809 ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/21948?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `67.56757%` with `12 lines` in your changes are missing coverage. Please review. > Project coverage is 73.95%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`3d389b1`)](https://app.codecov.io/gh/apache/pulsar/pull/21948?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 186 commits behind head on master. Additional details and impacted files [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/21948/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) ```diff @@ Coverage Diff @@ ## master #21948 +/- ## + Coverage 73.57% 73.95% +0.38% + Complexity32624 2744 -29880 Files 1877 1885 +8 Lines139502 140512+1010 Branches 1529915450 +151 + Hits 102638 103916+1278 + Misses2890828563 -345 - Partials 7956 8033 +77 ``` | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `26.98% <21.62%> (+2.39%)` | :arrow_up: | | [systests](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.47% <10.81%> (+0.15%)` | :arrow_up: | | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/21948/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `73.24% <67.56%> (+0.39%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. | [Files](https://app.codecov.io/gh/apache/pulsar/pull/21948?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [...a/org/apache/pulsar/broker/service/Replicator.java](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FReplicator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1JlcGxpY2F0b3IuamF2YQ==) | `0.00% <ø> (ø)` | | | [...roker/service/persistent/PersistentReplicator.java](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentReplicator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFJlcGxpY2F0b3IuamF2YQ==) | `66.21% <75.00%> (-2.66%)` | :arrow_down: | | [...ache/pulsar/broker/service/AbstractReplicator.java](https://app.codecov.io/gh/apache/pulsar/pull/21948?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FAbstractReplicator.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0UmVwbGljYXRvci5qYXZh
(pulsar-dotpulsar) branch master updated: Update CHANGELOG.md
This is an automated email from the ASF dual-hosted git repository. djensen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git The following commit(s) were added to refs/heads/master by this push: new 921479b Update CHANGELOG.md 921479b is described below commit 921479b83e11fc2cb1a9d719b1ab2442195f7f1f Author: entvex <1580435+ent...@users.noreply.github.com> AuthorDate: Wed Apr 24 19:39:08 2024 +0200 Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d75ba78..aae57f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [3.2.1-rc.1] - 2024-04-19 +## [3.2.1] - 2024-04-24 ### Fixed
[PR] [improve] PIP-347 Extend LTS release process to client SDKs.md [pulsar]
merlimat opened a new pull request, #22578: URL: https://github.com/apache/pulsar/pull/22578 ### Motivation ### Modifications ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [X] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-dotpulsar) tag 3.2.1 created (now ffa5665)
This is an automated email from the ASF dual-hosted git repository. djensen pushed a change to tag 3.2.1 in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git at ffa5665 (commit) No new revisions were added by this update.
Re: [PR] [feat][site] PIP-264: Document topic messaging metrics [pulsar-site]
dragosvictor commented on code in PR #880: URL: https://github.com/apache/pulsar-site/pull/880#discussion_r1578260459 ## docs/reference-metrics-opentelemetry.md: ## @@ -8,6 +8,153 @@ Pulsar exposes the following OpenTelemetry metrics. ## Broker +### Topic Messaging metrics + + pulsar.broker.topic.subscription.count +The number of Pulsar subscriptions of the topic served by this broker. +* Type: UpDownCounter +* Unit: `{subscription}` + + pulsar.broker.topic.producer.count +The number of active producers of the topic connected to this broker. +* Type: UpDownCounter +* Unit: `{producer}` + + pulsar.broker.topic.consumer.count +The number of active consumers of the topic connected to this broker. +* Type: UpDownCounter +* Unit: `{consumer}` + + pulsar.broker.topic.message.incoming.count +The total number of messages received for this topic. +* Type: Counter +* Unit: `{message}` + + pulsar.broker.topic.message.outgoing.count +The total number of messages read from this topic. +* Type: Counter +* Unit: `{message}` + + pulsar.broker.topic.message.incoming.size +The total number of messages bytes received for this topic. +* Type: Counter +* Unit: `{byte}` + + pulsar.broker.topic.message.outgoing.size +The total number of messages bytes read from this topic. +* Type: Counter +* Unit: `{byte}` + + pulsar.broker.topic.publish.rate.limit +The number of times the publish rate limit is triggered. +* Type: Counter +* Unit: `{event}` + + pulsar.broker.topic.consumer.msg.ack +The total number of message acknowledgments received for this topic. +* Type: Counter +* Unit: `{ack}` + + pulsar.broker.topic.storage.size +The total storage size of the messages in this topic. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.logical.size +The storage size of topics in the namespace owned by the broker without replicas. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.backlog.size +The total backlog size of the topics of this topic owned by this broker. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.offloaded.size +The total amount of the data in this topic offloaded to the tiered storage. +* Type: UpDownCounter +* Unit: `{byte}` + + pulsar.broker.topic.storage.backlog.quota.limit.size +The size based backlog quota limit for this topic. +* Type: Gauge +* Unit: `{byte}` + + pulsar.broker.topic.storage.backlog.quota.limit.time +The time based backlog quota limit for this topic. +* Type: Gauge +* Unit: `{second}` + + pulsar.broker.topic.storage.backlog.quota.eviction.count +The number of times a backlog was evicted since it has exceeded its quota. +* Type: Counter +* Unit: `{eviction}` + + pulsar.broker.topic.storage.backlog.age +The age of the oldest unacknowledged message (backlog). +* Type: Gauge +* Unit: `{second}` + + pulsar.broker.topic.storage.outgoing +The total message batches (entries) written to the storage for this topic. +* Type: UpDownCounter +* Unit: `{message batch}` + + pulsar.broker.topic.storage.incoming +The total message batches (entries) read from the storage for this topic. +* Type: UpDownCounter +* Unit: `{message batch}` + + pulsar.broker.topic.compaction.removed.event.count +The total number of removed events of the compaction. +* Type: UpDownCounter +* Unit: `{event}` + + pulsar.broker.topic.compaction.succeed.count +The total number of successes of the compaction. +* Type: UpDownCounter +* Unit: `{event}` + + pulsar.broker.topic.compaction.failed.count Review Comment: Refactored into just one metric named `pulsar.broker.topic.compaction.operation.count`, with an attribute to specify the result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
poorbarcode closed pull request #22577: [fix] [broker] [Namespace-level Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side URL: https://github.com/apache/pulsar/pull/22577 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix] Include swagger annotations in shaded client lib (#22570)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 62546d977eb [fix] Include swagger annotations in shaded client lib (#22570) 62546d977eb is described below commit 62546d977ebb291dffa4629c6c8ee5fbcd559777 Author: Matteo Merli AuthorDate: Tue Apr 23 22:32:06 2024 -0700 [fix] Include swagger annotations in shaded client lib (#22570) --- distribution/shell/src/assemble/LICENSE.bin.txt | 1 + pulsar-client/pom.xml | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 2bdcac5532c..4bf34fb1369 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -331,6 +331,7 @@ The Apache Software License, Version 2.0 - listenablefuture-.0-empty-to-avoid-conflict-with-guava.jar * J2ObjC Annotations -- j2objc-annotations-1.3.jar * Netty Reactive Streams -- netty-reactive-streams-2.0.6.jar + * Swagger -- swagger-annotations-1.6.2.jar * DataSketches - memory-0.8.3.jar - sketches-core-0.8.3.jar diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index a8b98e7ab26..7b918533531 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -76,7 +76,6 @@ io.swagger swagger-annotations - provided
Re: [I] [Bug] [ARM] Partially acknowledged batches are not redelivered [pulsar-client-cpp]
bph-sag commented on issue #424: URL: https://github.com/apache/pulsar-client-cpp/issues/424#issuecomment-2075337713 > > Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), pulsar-client-cpp 3.3.0 > > We've used the same broker and test with an x86_64 Pulsar 3.5.0 Pulsar client, and we've had no issues (RHEL8). > > So you tested 3.5.0-x86_64 and 3.3.0-aarch64? > > Since a batch is stored as a BK entry, you can check the topic stats for the mark-delete position and compare it with the message id you received. The result you see might because the whole batch was somehow acknowledged. > > BTW, if you suspect the issue is related to the DEB packages, you can try building from source and then test it again. You can try [vcpkg](https://github.com/apache/pulsar-client-cpp/tree/main/vcpkg-example#vcpkg-example) to save your time. (Currently only 3.4.2 is supported, I will add the 3.5.0 to vcpkg soon) Oops, typo'd the x86_64 version, I meant to say 3.3.0-x86_64. Corrected the parent comment. And no, we're not using the Debian packages, we're compiling it ourselves for aarch32 (x86_64 as well). So, 3.3.0-aarch32. --- I'll try and take a look at the topic stats, and get back to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix] [broker] [namespace Geo-Replication] Reject a topic creation if there is a confilct topic on the remote side [pulsar]
poorbarcode opened a new pull request, #22577: URL: https://github.com/apache/pulsar/pull/22577 ### Motivation The steps to describe the issue - There is a partitioned topic on the remote cluster, with partitions `1`. And the source cluster does not have this topic. - Enable namespace-level Geo-Replication. - Create the topic with `3` partitions on the source cluster. - The broker just printed an error log "[Error] This topic already exists", and users get a successful response. ### Modifications Reject a topic creation if there is a conflict topic on the remote side ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: x -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] /metrics gzip compression sporadically fails with error 500 caused by java.nio.BufferOverflowException [pulsar]
lhotari opened a new issue, #22575: URL: https://github.com/apache/pulsar/issues/22575 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version master ### Minimal reproduce step TBD ### What did you expect to see? getting /metrics with gzip compression shouldn't fail ### What did you see instead? it sometimes fails ### Anything else? The problem in the code is here: https://github.com/apache/pulsar/blob/94f6c7ccd2bf8bc261d45ab41f6c7f123359fa47/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java#L236-L241 putInts will fail if the compress buffer doesn't have remaining space. ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] [ARM] Partially acknowledged batches are not redelivered [pulsar-client-cpp]
bph-sag opened a new issue, #424: URL: https://github.com/apache/pulsar-client-cpp/issues/424 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar-client-cpp/issues) and found nothing similar. ### Version Debian GNU/Linux 10 (buster), aarch64 (but the executable is arm32), pulsar-client-cpp 3.3.0 ### Minimal reproduce step 1. Send a batch of events to the Pulsar broker (in our reproduction, it is a batch size of 309). 2. Receive these messages (we're using `pulsar::Consumer::receive` in a loop), while continuously acknowledging them asynchronously (acknowledgements are done using `pulsar::Consumer::acknowledge(const MessageId &messageId)`, where `messageId` is `pulsar::Message::getMessageId`). 3. Terminate the program during the middle of receiving a batch (we're terminating after having received and acknowledged the 190/309 events from a batch). 4. Resume the program. ### What did you expect to see? All of the events in the batch re-sent. ### What did you see instead? None of the events in the batch, _including the ones never received_ are re-sent. ### Anything else? - We log the result of the acknowledgement in the callback method, which shows us that the most we've acknowledged is the `190th` message in a batch. - Exclusive subscription. - Running it on Gravitron2/3 processors. - We've used the same broker and test with an x86_64 Pulsar 3.5.0 Pulsar client, and we've had no issues (RHEL8). - We can actively see missing messages - we are just sending consecutive numbers and logging them. We're missing exactly 119, the amount remaining in the batch that we've not fully acknowledged. - We've tried not immediately terminating - i.e. we stop receiving messages but continue to process any callbacks, to validate that we're just not seeing 119 callbacks missing, and nothing. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577951228 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() { } } +@Override +public CompletableFuture getLastCanDispatchPosition() { +return CompletableFuture.completedFuture(getMaxReadPosition()); Review Comment: I agree with your point, I'm not familiar with transaction implementation yet, and I'm not sure if there's anything special to deal with. This PR will keep the logic the same as before when enabling transactions. and I think I can write a test in the next PR to verify your guess and fix the transaction buffer issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577928912 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() { } } +@Override +public CompletableFuture getLastCanDispatchPosition() { +return CompletableFuture.completedFuture(getMaxReadPosition()); Review Comment: And if all the txns committed/aborted, the maxReadPosition will also set to lastConfirmedEntry, do we need to consider the case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
dao-jun commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577908495 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java: ## @@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() { } } +@Override +public CompletableFuture getLastCanDispatchPosition() { +return CompletableFuture.completedFuture(getMaxReadPosition()); Review Comment: It looks a little strange, after recover finished and there is no ongoing txns, the maxReadPosition will be set to ledger's lastConfirmedEntry, and the last entry also could be a Marker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] /data/bookie1/ledgers/current log clean [pulsar]
GitHub user visortelle edited a comment on the discussion: /data/bookie1/ledgers/current log clean I didn't configure it for Bookkeeper, but I suppose that you can configure it in `log4j.properties` file: > To enable logging for a bookie, create a log4j.properties file and point the > BOOKIE_LOG_CONF environment variable to the configuration file. https://bookkeeper.apache.org/docs/admin/bookies#logging > RollingFileAppender does this. You just need to set maxBackupIndex to the > highest value for the backup file. https://stackoverflow.com/a/1050318/4182882 GitHub link: https://github.com/apache/pulsar/discussions/22574#discussioncomment-9213583 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] /data/bookie1/ledgers/current log clean [pulsar]
GitHub user visortelle added a comment to the discussion: /data/bookie1/ledgers/current log clean I suppose you can configure it in `log4j.properties` file: > To enable logging for a bookie, create a log4j.properties file and point the > BOOKIE_LOG_CONF environment variable to the configuration file. https://bookkeeper.apache.org/docs/admin/bookies#logging > RollingFileAppender does this. You just need to set maxBackupIndex to the > highest value for the backup file. https://stackoverflow.com/a/1050318/4182882 GitHub link: https://github.com/apache/pulsar/discussions/22574#discussioncomment-9213583 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
[PR] [fix][broker] Avoid being stuck in 30+ seconds when closing the BrokerService [pulsar]
BewareMyPower opened a new pull request, #22573: URL: https://github.com/apache/pulsar/pull/22573 Fixes https://github.com/apache/pulsar/issues/22569 ### Motivation `BrokerService#closeAsync` calls `unloadNamespaceBundlesGracefully` to unload namespaces gracefully. With extensible load manager, it eventually calls `TableViewLoadDataStoreImpl#validateProducer`: ``` BrokerService#unloadNamespaceBundlesGracefully ExtensibleLoadManagerWrapper#disableBroker ExtensibleLoadManagerImpl#disableBroker ServiceUnitStateChannelImpl#cleanOwnerships ServiceUnitStateChannelImpl#doCleanup TableViewLoadDataStoreImpl#removeAsync TableViewLoadDataStoreImpl#validateProducer ``` In `validateProducer`, if the producer is not connected, it will recreate the producer synchronously. However, since the state of `PulsarService` has already been changed to `Closing`, all connect or lookup requests will fail with `ServiceNotReady`. Then the client will retry until timeout. Besides, the unload operation could also trigger the reconnection because the extensible load manager sends the unload event to the `loadbalancer-service-unit-state` topic. ### Modifications The major fix: Before changing PulsarService's state to `Closing`, call `BrokerService#unloadNamespaceBundlesGracefully` first to make the load manager complete the unload operations first. Minor fixes: - Record the time when `LoadManager#disableBroker` is done. - Don't check if producer is disconnected because the producer could retry if it's disconnected. ### Verifications Add `ExtensibleLoadManagerCloseTest` to verify closing `PulsarService` won't take too much time. Here are some test results locally: ``` 2024-04-24T19:43:38,851 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3342, 3276, 3310] 2024-04-24T19:44:26,711 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3357, 3258, 3298] 2024-04-24T19:46:16,791 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3313, 3257, 3263] 2024-04-24T20:13:05,763 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3304, 3279, 3299] 2024-04-24T20:13:43,979 - INFO - [main:ExtensibleLoadManagerCloseTest] - Brokers close time: [3343, 3308, 3310] ``` As you can see, each broker takes only about 3 seconds to close due to `OWNERSHIP_CLEAN_UP_CONVERGENCE_DELAY_IN_MILLIS` value added in https://github.com/apache/pulsar/pull/20315 ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Apache Beam support for Pulsar [pulsar]
GitHub user hpvd edited a comment on the discussion: Apache Beam support for Pulsar the progress for pulsar on becoming a first class citizen in beam, seems to have stucked :-( There are not only some - open issues, but also - non merged but automatically as stale closed pull request. Have integrated them into the overview: https://github.com/apache/beam/issues/31078 GitHub link: https://github.com/apache/pulsar/discussions/18453#discussioncomment-9212078 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Apache Beam support for Pulsar [pulsar]
GitHub user hpvd edited a comment on the discussion: Apache Beam support for Pulsar the progress for pulsar on becoming a first class citizen in beam, seems to have stucked :-( There are not only some - open issues, but also - non merged but automatically as stale closed pull request. Have integrated them into the overview: https://github.com/apache/beam/issues/31079 GitHub link: https://github.com/apache/pulsar/discussions/18453#discussioncomment-9212078 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [D] Apache Beam support for Pulsar [pulsar]
GitHub user hpvd added a comment to the discussion: Apache Beam support for Pulsar the progress for pulsar on becoming a first class citizen in beam, seems to have stucked :-( There are not only some - open issues, but also - non merged but automatically as stale closed pull request. Have integrated them into the overview: https://github.com/apache/beam/issues/31079 GitHub link: https://github.com/apache/pulsar/discussions/18453#discussioncomment-9212078 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions [pulsar]
Technoboy- commented on code in PR #22536: URL: https://github.com/apache/pulsar/pull/22536#discussion_r1577665699 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java: ## @@ -139,28 +147,38 @@ public void sendMessages(List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); +final Map> consumerStickyKeyHashesMap = localGroupedStickyKeyHashes.get(); +consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { -Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer())); +byte[] stickyKey = peekStickyKey(entry.getDataBuffer()); +int stickyKeyHash = StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + +Consumer consumer = selector.select(stickyKeyHash); if (consumer != null) { -groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry); +int startingSize = Math.max(10, entries.size() / (2 * consumerSet.size())); Review Comment: What does this `startingSize` means? why do we need this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on PR #21948: URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2074593502 @codelipenghui > It looks like we can also fix the issue by involving a read lock to the addReplicationCluster() method in PersistentTopic.java without changing the close topic process. The replicator.disconnect() method will also call the managed cursor API to [change the cursor state](https://github.com/apache/pulsar/blob/8a18043585c49624e02b875b921336a81a9ec577/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java#L201) which doesn't make sense to close the managed ledger before closing the replicator Good suggestion, already improved the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
lhotari commented on code in PR #22572: URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577628530 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ## @@ -4321,4 +4321,44 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) assertEquals(ml.currentLedgerEntries, 0); }); } + +@Test +public void testReverseFindPositionOneByOne() throws Exception { +final int maxEntriesPerLedger = 5; + +ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); +managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger); +ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig); + +String matchEntry = "match-entry"; +String noMatchEntry = "nomatch-entry"; +Predicate predicate = entry -> { +try { +String entryValue = entry.getDataBuffer().toString(UTF_8); +if (matchEntry.equals(entryValue)) { +return true; +} +} finally { +entry.release(); +} Review Comment: it's better to move releasing to internalAsyncReverseFindPositionOneByOne ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 5a13f04aa1c [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565) 5a13f04aa1c is described below commit 5a13f04aa1c1848c2919b3383cbc50b3262316d9 Author: Baodi Shi AuthorDate: Wed Apr 24 15:01:19 2024 +0800 [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565) (cherry picked from commit a3cd1f8dd2c9f3fbc128f4ba6fb00f865b3a2316) --- .../elastic/ElasticSearchJavaRestClient.java | 1 + .../opensearch/OpenSearchHighLevelRestClient.java | 1 + .../io/elasticsearch/ElasticSearchClientTests.java | 34 ++ 3 files changed, 36 insertions(+) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index 4749ea2e2d3..afda5ba0e74 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -84,6 +84,7 @@ public class ElasticSearchJavaRestClient extends RestClient { .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) +.setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.elasticsearch.client.RestClient.FailureListener() { public void onFailure(Node node) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 7b704196702..bb92047f17a 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -112,6 +112,7 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) +.setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.opensearch.client.RestClient.FailureListener() { @Override diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index c1e0eafe03a..468d78d989c 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -30,8 +30,10 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import co.elastic.clients.transport.rest_client.RestClientTransport; import eu.rekawek.toxiproxy.model.ToxicDirection; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -46,6 +48,8 @@ import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestCl import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient; import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer; import org.awaitility.Awaitility; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; import org.testcontainers.containers.Network; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testng.annotations.AfterClass; @@ -110,11 +114,41 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { public void testClientInstance() throws Exception { try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSear
(pulsar) branch branch-3.0 updated: [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new e3bd58c48f6 [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565) e3bd58c48f6 is described below commit e3bd58c48f6cac0eb6305dce26354585de345cf0 Author: Baodi Shi AuthorDate: Wed Apr 24 15:01:19 2024 +0800 [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565) (cherry picked from commit a3cd1f8dd2c9f3fbc128f4ba6fb00f865b3a2316) --- .../elastic/ElasticSearchJavaRestClient.java | 1 + .../opensearch/OpenSearchHighLevelRestClient.java | 1 + .../io/elasticsearch/ElasticSearchClientTests.java | 38 -- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index 4749ea2e2d3..afda5ba0e74 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -84,6 +84,7 @@ public class ElasticSearchJavaRestClient extends RestClient { .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) +.setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.elasticsearch.client.RestClient.FailureListener() { public void onFailure(Node node) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 7b704196702..bb92047f17a 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -112,6 +112,7 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) +.setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.opensearch.client.RestClient.FailureListener() { @Override diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index 6d9928c0426..5e8347b708d 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -20,6 +20,7 @@ package org.apache.pulsar.io.elasticsearch; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -27,8 +28,10 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import co.elastic.clients.transport.rest_client.RestClientTransport; import eu.rekawek.toxiproxy.model.ToxicDirection; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -41,7 +44,8 @@ import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestCl import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient; import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer; import org.awaitility.Awaitility; -import org.mockito.Mockito; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient;
[PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd opened a new pull request, #22572: URL: https://github.com/apache/pulsar/pull/22572 ### Motivation #22571 ### Analysis When enabling `replicateSubscriptionState` will use topic to sync subscription state, and make these message metadata as [Marker](https://github.com/apache/pulsar/blob/ac94296d2488b2b13d76fa2b1bfa71e9e3cfbb45/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java#L107-L120) These `marker` messages will not be sent to the consumer by the topic, and will automatically ack them. https://github.com/apache/pulsar/blob/fc393f69043be6eb1b2572a27f131656a2cbc7f6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L202-L217 But `getLastMessageId` will always return the last message position, regardless of whether the last message is `marked` or `not`. This will cause the reader stuck. ```java while (reader.hasMessageAvailable()) { // get true Message message reader.readNext(); // never can't receive msg. } ``` Your refer to this diagram to understand this bug: https://github.com/apache/pulsar/assets/33416836/62c5fdb2-9273-4b50-a527-0f649ffd8228";> ### Modifications - Add `asyncReverseFindPositionOneByOne` method on `ManagedLedger` . - Add `getLastCanDispatchPosition` method on `Topic`, it will call `asyncReverseFindPositionOneByOne` to find the last position of entry that not is `replistateSubscriptionState` - Change the `getLastMessageId` implement of `ServerCnx` to use `getLastCanDispatchPosition` instead of `getMaxReadPosition`. ### Verifying this change - Add `ManagedLedgerTest.testReverseFindPositionOneByOne` to cover ReverseFindPositionOneByOne method. - Add `testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage` to cover this bug. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]
shibd opened a new issue, #22571: URL: https://github.com/apache/pulsar/issues/22571 ### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### Version - master - 3.2.x - 3.1.x - 3.0.x. - 2.11.x ### Minimal reproduce step In geo-replication case. Let's say there are two clusters: `r1`, `r2`, will replicator topic:`my-topic` from `r1` and `r2`. The consumer1 subscribes to the topic `my-topic` of `r1` and enables `replicateSubscriptionState`. After the subscription state sync to `r2`: `my-topic`, create a `reader` read the message from `r2`:`my-topic` will stuck on `readNext`. Please copy this test to `ReplicatorSubscriptionTest` to run it. ```java @Test public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception { String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage"); String topicName = "persistent://" + namespace + "/mytopic"; String subscriptionName = "cluster-subscription"; // this setting can be used to manually run the test with subscription replication disabled // it shows that subscription replication has no impact in behavior for this test case boolean replicateSubscriptionState = true; admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); @Cleanup PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) .statsInterval(0, TimeUnit.SECONDS) .build(); // create subscription in r1 createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState); @Cleanup PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) .statsInterval(0, TimeUnit.SECONDS) .build(); // create subscription in r2 createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState); Set sentMessages = new LinkedHashSet<>(); // send messages in r1 @Cleanup Producer producer = client1.newProducer().topic(topicName) .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); int numMessages = 6; for (int i = 0; i < numMessages; i++) { String body = "message" + i; producer.send(body.getBytes(StandardCharsets.UTF_8)); sentMessages.add(body); } producer.close(); // consume 3 messages in r1 Set receivedMessages = new LinkedHashSet<>(); try (Consumer consumer1 = client1.newConsumer() .topic(topicName) .subscriptionName(subscriptionName) .replicateSubscriptionState(replicateSubscriptionState) .subscribe()) { readMessages(consumer1, receivedMessages, 3, false); } // wait for subscription to be replicated Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); // create a reader in r2 Reader reader = client2.newReader().topic(topicName) .subscriptionName("new-sub") .startMessageId(MessageId.earliest) .create(); int readNum = 0; while (reader.hasMessageAvailable()) { Message message = reader.readNext(10, TimeUnit.SECONDS); System.out.println("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId()); assertNotNull(message); readNum++; } assertEquals(readNum, numMessages); } ``` ### What did you expect to see? Test can passed. ### What did you see instead? Test will stuck. ### Anything else? _No response_ ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-dotpulsar) branch master updated: Update release_validation_linux_macos.md
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git The following commit(s) were added to refs/heads/master by this push: new 56828c7 Update release_validation_linux_macos.md 56828c7 is described below commit 56828c72180dffaa9ec0f91907abdba44a4fbe81 Author: Lari Hotari AuthorDate: Wed Apr 24 02:00:43 2024 -0700 Update release_validation_linux_macos.md --- docs/release_validation_linux_macos.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/release_validation_linux_macos.md b/docs/release_validation_linux_macos.md index 20269c9..4c0295e 100644 --- a/docs/release_validation_linux_macos.md +++ b/docs/release_validation_linux_macos.md @@ -48,7 +48,7 @@ gpg --verify-files *.asc ```shell tar zxvf pulsar-dotpulsar-${DOTPULSAR_VERSION}-src.tar.gz -cd pulsar-dotpulsar-${DOTPULSAR_VERSION_RC}-src +cd pulsar-dotpulsar-${DOTPULSAR_VERSION}-src dotnet build ```
Re: [D] Is there any way to delete a global topics without auto GC. [pulsar]
GitHub user shulaoh added a comment to the discussion: Is there any way to delete a global topics without auto GC. finally got a workaroud. for a topic-level gep-replication. need to remove-replication-cluster from the topic before it gets deleted. GitHub link: https://github.com/apache/pulsar/discussions/22481#discussioncomment-9210320 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]
poorbarcode commented on PR #21948: URL: https://github.com/apache/pulsar/pull/21948#issuecomment-2074263037 Rebase master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Create partition topics in the remote clusters before set replication policies at the topic level [pulsar]
poorbarcode closed pull request #22203: [fix][broker] Create partition topics in the remote clusters before set replication policies at the topic level URL: https://github.com/apache/pulsar/pull/22203 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Create partition topics in the remote clusters before set replication policies at the topic level [pulsar]
poorbarcode commented on PR #22203: URL: https://github.com/apache/pulsar/pull/22203#issuecomment-2074225216 Since https://github.com/apache/pulsar/pull/22537 has been merged, I close this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve] [broker] Create partitioned topics automatically when enable topic level replication (#22537)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d4756557bf4 [improve] [broker] Create partitioned topics automatically when enable topic level replication (#22537) d4756557bf4 is described below commit d4756557bf4328019dd938a56c3135aecc3147e4 Author: fengyubiao AuthorDate: Wed Apr 24 15:06:00 2024 +0800 [improve] [broker] Create partitioned topics automatically when enable topic level replication (#22537) --- .../apache/pulsar/broker/admin/AdminResource.java | 104 +++-- .../broker/admin/impl/PersistentTopicsBase.java| 23 - .../broker/service/OneWayReplicatorTest.java | 87 - .../broker/service/OneWayReplicatorTestBase.java | 31 +++--- 4 files changed, 196 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index a1bfeb2142f..45455f16d4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.google.errorprone.annotations.CanIgnoreReturnValue; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -43,9 +44,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.service.plugin.InvalidEntryFilterException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; import org.apache.pulsar.common.naming.Constants; @@ -621,35 +624,82 @@ public abstract class AdminResource extends PulsarWebResource { private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode merged PR #22537: URL: https://github.com/apache/pulsar/pull/22537 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][io] CompressionEnabled didn't work on elasticsearch sink [pulsar]
nicoloboschi merged PR #22565: URL: https://github.com/apache/pulsar/pull/22565 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565)
This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a3cd1f8dd2c [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565) a3cd1f8dd2c is described below commit a3cd1f8dd2c9f3fbc128f4ba6fb00f865b3a2316 Author: Baodi Shi AuthorDate: Wed Apr 24 15:01:19 2024 +0800 [fix][io] CompressionEnabled didn't work on elasticsearch sink (#22565) --- .../elastic/ElasticSearchJavaRestClient.java | 1 + .../opensearch/OpenSearchHighLevelRestClient.java | 1 + .../io/elasticsearch/ElasticSearchClientTests.java | 34 ++ 3 files changed, 36 insertions(+) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index 4749ea2e2d3..afda5ba0e74 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -84,6 +84,7 @@ public class ElasticSearchJavaRestClient extends RestClient { .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) +.setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.elasticsearch.client.RestClient.FailureListener() { public void onFailure(Node node) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 7b704196702..bb92047f17a 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -112,6 +112,7 @@ public class OpenSearchHighLevelRestClient extends RestClient implements BulkPro .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) +.setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.opensearch.client.RestClient.FailureListener() { @Override diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index c1e0eafe03a..468d78d989c 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -30,8 +30,10 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import co.elastic.clients.transport.rest_client.RestClientTransport; import eu.rekawek.toxiproxy.model.ToxicDirection; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -46,6 +48,8 @@ import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestCl import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient; import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer; import org.awaitility.Awaitility; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; import org.testcontainers.containers.Network; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testng.annotations.AfterClass; @@ -110,11 +114,41 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { public void testClientInstance() throws Exception { try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() .setElasticSearchUrl("http://"; + container.getHtt