Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


dragosvictor commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575673797


##
pulsar/producer_partition.go:
##
@@ -429,12 +453,22 @@ func (p *partitionProducer) reconnectToBroker() {
return
}
 
-   if p.options.BackoffPolicy == nil {
+   var assignedBrokerURL string
+
+   if connectionClosed != nil && connectionClosed.HasURL() {
+   delayReconnectTime = 0
+   assignedBrokerURL = connectionClosed.assignedBrokerURL
+   connectionClosed = nil // Only attempt once

Review Comment:
   Assigning the value to nil makes it resume the established reconnection 
logic if the initial connection attempt fails with the given assigned URL. I 
tried to capture this idea in the comment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Make BrokerSelectionStrategy pluggable [pulsar]

2024-04-22 Thread via GitHub


BewareMyPower commented on PR #22553:
URL: https://github.com/apache/pulsar/pull/22553#issuecomment-2071455028

   ```
 Error:  Failures: 
 Error:  
org.apache.pulsar.broker.loadbalance.extensions.strategy.CustomBrokerSelectionStrategyTest.testSingleBrokerSelected
 [INFO]   Run 1: PASS
 Error:Run 2: 
CustomBrokerSelectionStrategyTest.testSingleBrokerSelected:71 expected [34517] 
but found [38481]
   ```
   
   It seems this test is flaky. I will check the failure soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][broker] Apply loadBalancerDebugModeEnabled in LeastResourceUsageWithWeight (#22549)

2024-04-22 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 882ce415c94 [improve][broker] Apply loadBalancerDebugModeEnabled in 
LeastResourceUsageWithWeight (#22549)
882ce415c94 is described below

commit 882ce415c94db215c6f06aa5212b6d321231e35e
Author: Yunze Xu 
AuthorDate: Tue Apr 23 13:42:22 2024 +0800

[improve][broker] Apply loadBalancerDebugModeEnabled in 
LeastResourceUsageWithWeight (#22549)
---
 .../loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java  | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
index 98986d84b98..9bf16ac1795 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeight.java
@@ -96,8 +96,7 @@ public class LeastResourceUsageWithWeight implements 
BrokerSelectionStrategy {
 // select one of them at the end.
 double totalUsage = 0.0d;
 
-// TODO: use loadBalancerDebugModeEnabled too.
-boolean debugMode = log.isDebugEnabled();
+boolean debugMode = log.isDebugEnabled() || 
conf.isLoadBalancerDebugModeEnabled();
 for (String broker : candidates) {
 var brokerLoadDataOptional = 
context.brokerLoadDataStore().get(broker);
 if (brokerLoadDataOptional.isEmpty()) {



Re: [PR] [improve][broker] Apply loadBalancerDebugModeEnabled in LeastResourceUsageWithWeight [pulsar]

2024-04-22 Thread via GitHub


BewareMyPower merged PR #22549:
URL: https://github.com/apache/pulsar/pull/22549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] add role for consumer stat [pulsar]

2024-04-22 Thread via GitHub


thetumbled commented on PR #22562:
URL: https://github.com/apache/pulsar/pull/22562#issuecomment-2071376117

   PTAL, thanks. @lhotari @codelipenghui @BewareMyPower @Technoboy- 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


BewareMyPower commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575606991


##
pulsar/producer_partition.go:
##
@@ -429,12 +453,22 @@ func (p *partitionProducer) reconnectToBroker() {
return
}
 
-   if p.options.BackoffPolicy == nil {
+   var assignedBrokerURL string
+
+   if connectionClosed != nil && connectionClosed.HasURL() {
+   delayReconnectTime = 0
+   assignedBrokerURL = connectionClosed.assignedBrokerURL
+   connectionClosed = nil // Only attempt once

Review Comment:
   `connectionClosed` comes from the `ConnectionClosed` method through the 
`p.connectClosedCh` channel. Assigning `nil` to this local variable seems 
meaningless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve] [broker] add role for consumer stat [pulsar]

2024-04-22 Thread via GitHub


thetumbled opened a new pull request, #22562:
URL: https://github.com/apache/pulsar/pull/22562

   ### Motivation
   
   By config `allowAutoSubscriptionCreation`, we allow uses to create 
subscriptions automatically if user have the `consume` permission. We have a 
strong need to trace back who is the owner of a specific subscription, but 
currently there is no such way to achieve that.
   
   ### Modifications
   
   Add field `role` in the stats response.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [x] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] add role for consumer stat [pulsar]

2024-04-22 Thread via GitHub


thetumbled closed pull request #22561: [improve] [broker] add role for consumer 
stat
URL: https://github.com/apache/pulsar/pull/22561


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve] [broker] add role for consumer stat [pulsar]

2024-04-22 Thread via GitHub


thetumbled opened a new pull request, #22561:
URL: https://github.com/apache/pulsar/pull/22561

   
   ### Motivation
   
   By config `allowAutoSubscriptionCreation`, we allow uses to create 
subscriptions automatically if user have the `consume` permission. We have a 
strong need to trace back who is the owner of a specific subscription, but 
currently there is no such way to achieve that.
   
   ### Modifications
   
   Add field `role` in the stats response.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][fn]make sure the classloader for ContextImpl is `functionClassLoader` in different runtimes [pulsar]

2024-04-22 Thread via GitHub


Awsmsniper commented on PR #22501:
URL: https://github.com/apache/pulsar/pull/22501#issuecomment-2071364851

   > @Awsmsniper it should be another issue, could you please create a new 
issue first?
   > 
   > Also, 
https://hub.docker.com/layers/freeznet/pulsar-all/3.0.5-SNAPSHOT-1c46877/images/sha256-d881a766d57b77ec4b279688b96b714f34eb270496b052eb69362db5c62d8bd3?context=repo
 is the docker image I built from this PR based on branch-3.0, you may check if 
this fix solves your issue as well.
   
   I did not use the version you provided. After using version 3.0.4, I found 
that it can be used normally. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]

2024-04-22 Thread via GitHub


massakam commented on PR #22556:
URL: https://github.com/apache/pulsar/pull/22556#issuecomment-2071325099

   Resolved the conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


BewareMyPower commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575573046


##
pulsar/internal/pulsar_proto/PulsarApi.pb.go:
##
@@ -18,8 +18,8 @@
 
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-// protoc-gen-go v1.28.1
-// protocv3.21.9
+// protoc-gen-go v1.31.0
+// protocv4.24.3

Review Comment:
   Yeah, when I regenerated it locally, I also found only the version comments 
were changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][ml] Make ManagedLedger read entries parallel [pulsar]

2024-04-22 Thread via GitHub


dao-jun opened a new pull request, #22560:
URL: https://github.com/apache/pulsar/pull/22560

   ### Motivation
   
   In https://github.com/apache/pulsar/pull/19035 we introduced `skipCondition` 
to filter-out delay delivery messages before read entries from Bookkeeper, and 
in https://github.com/apache/pulsar/pull/21739, we also filter-out 
deleted(individual acked messages) before read entries.
   
   However, it will lead to one situation:  one single segment can be spit into 
segments. For example:
   entries to be filter-out: [3, 5, 7]
   entries to read: [1, 10]
   then, it will be split into: [[1,2],[4], [6], [8,10]].
   
   In the current implementation, after read [1,2] finished, then begin to read 
[4], after read [4] finished, then start to read [6]...
   It will lead to latency increasing, memory(allocated for entries) will also 
be retained for a longer period of time, and affect the throughput of the 
system. 
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch main updated: Docs sync done from apache/pulsar (#4924052)

2024-04-22 Thread urfree
This is an automated email from the ASF dual-hosted git repository.

urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new eaf427144d02 Docs sync done from apache/pulsar (#4924052)
eaf427144d02 is described below

commit eaf427144d02706b666ec3ac822b090f4f1d1227
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 23 01:34:38 2024 +

Docs sync done from apache/pulsar (#4924052)
---
 .../next/config/reference-configuration-broker.md  | 35 ++
 .../config/reference-configuration-pulsar-proxy.md | 35 ++
 .../config/reference-configuration-standalone.md   | 35 ++
 .../config/reference-configuration-websocket.md| 35 ++
 static/swagger/master/swagger.json |  9 ++
 static/swagger/master/v2/swagger.json  |  9 ++
 6 files changed, 158 insertions(+)

diff --git a/static/reference/next/config/reference-configuration-broker.md 
b/static/reference/next/config/reference-configuration-broker.md
index cab9e7f7e7ba..2545e65d0b81 100644
--- a/static/reference/next/config/reference-configuration-broker.md
+++ b/static/reference/next/config/reference-configuration-broker.md
@@ -3868,6 +3868,29 @@ If enabled the feature that transaction pending ack log 
batch, this attribute me
 
 **Category**: Server
 
+### webServiceHaProxyProtocolEnabled
+Enable or disable the use of HA proxy protocol for resolving the client IP for 
http/https requests. Default is false.
+
+**Type**: `boolean`
+
+**Default**: `false`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
+### webServiceLogDetailedAddresses
+Add detailed client/remote and server/local addresses and ports to http/https 
request logging.
+Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+
+**Type**: `java.lang.Boolean`
+
+**Default**: `null`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
 ### webServicePort
 The port for serving http requests
 
@@ -3901,6 +3924,18 @@ Specify the TLS provider for the web service: SunJSSE, 
Conscrypt and etc.
 
 **Category**: Server
 
+### webServiceTrustXForwardedFor
+Trust X-Forwarded-For header for resolving the client IP for http/https 
requests.
+Default is false.
+
+**Type**: `boolean`
+
+**Default**: `false`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
 ### bookkeeperClientAuthenticationParameters
 Parameters for bookkeeper auth plugin
 
diff --git 
a/static/reference/next/config/reference-configuration-pulsar-proxy.md 
b/static/reference/next/config/reference-configuration-pulsar-proxy.md
index 655b98e5dc51..f0895b2f888a 100644
--- a/static/reference/next/config/reference-configuration-pulsar-proxy.md
+++ b/static/reference/next/config/reference-configuration-pulsar-proxy.md
@@ -955,6 +955,29 @@ Path for the file used to determine the rotation status 
for the proxy instance w
 
 **Category**: Server
 
+### webServiceHaProxyProtocolEnabled
+Enable or disable the use of HA proxy protocol for resolving the client IP for 
http/https requests. Default is false.
+
+**Type**: `boolean`
+
+**Default**: `false`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
+### webServiceLogDetailedAddresses
+Add detailed client/remote and server/local addresses and ports to http/https 
request logging.
+Defaults to true when either webServiceHaProxyProtocolEnabled or 
webServiceTrustXForwardedFor is enabled.
+
+**Type**: `java.lang.Boolean`
+
+**Default**: `null`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
 ### webServicePort
 The port for serving http requests
 
@@ -977,6 +1000,18 @@ The port for serving https requests
 
 **Category**: Server
 
+### webServiceTrustXForwardedFor
+Trust X-Forwarded-For header for resolving the client IP for http/https 
requests.
+Default is false.
+
+**Type**: `boolean`
+
+**Default**: `false`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
 ### tlsAllowInsecureConnection
 Accept untrusted TLS certificate from client.
 
diff --git a/static/reference/next/config/reference-configuration-standalone.md 
b/static/reference/next/config/reference-configuration-standalone.md
index cab9e7f7e7ba..2545e65d0b81 100644
--- a/static/reference/next/config/reference-configuration-standalone.md
+++ b/static/reference/next/config/reference-configuration-standalone.md
@@ -3868,6 +3868,29 @@ If enabled the feature that transaction pending ack log 
batch, this attribute me
 
 **Category**: Server
 
+### webServiceHaProxyProtocolEnabled
+Enable or disable the use of HA proxy protocol for resolving the client IP for 
http/https requests. Default is false.
+
+**Type**: `boolean`
+
+**Default**: `false`
+
+**Dynamic**: `false`
+
+**Category**: Server
+
+### webServiceLogDetailedAddresses
+Add detailed client/remote and server/local addresses and ports to http/https 
request 

(pulsar) branch master updated: [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21946)

2024-04-22 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 49240522f54 [fix] [broker] Part-1: Replicator can not created 
successfully due to an orphan replicator in the previous topic owner (#21946)
49240522f54 is described below

commit 49240522f543eea0e9307811c92b487eabe431d9
Author: fengyubiao 
AuthorDate: Tue Apr 23 09:23:08 2024 +0800

[fix] [broker] Part-1: Replicator can not created successfully due to an 
orphan replicator in the previous topic owner (#21946)
---
 .../pulsar/broker/service/AbstractReplicator.java  | 332 -
 .../pulsar/broker/service/BrokerService.java   |   2 +-
 .../apache/pulsar/broker/service/Replicator.java   |   4 +-
 .../nonpersistent/NonPersistentReplicator.java |   5 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  10 +-
 .../service/persistent/PersistentReplicator.java   |  87 +++---
 .../broker/service/persistent/PersistentTopic.java |  31 +-
 .../broker/service/AbstractReplicatorTest.java |  22 +-
 .../broker/service/OneWayReplicatorTest.java   | 276 -
 .../broker/service/OneWayReplicatorTestBase.java   |  40 ++-
 .../pulsar/broker/service/PersistentTopicTest.java |   6 +-
 .../pulsar/broker/service/ReplicatorTest.java  |  11 +-
 12 files changed, 656 insertions(+), 170 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 1b5b2824257..f34144deb0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,16 +18,22 @@
  */
 package org.apache.pulsar.broker.service;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.Backoff;
@@ -39,7 +45,7 @@ import org.apache.pulsar.common.util.StringInterner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractReplicator {
+public abstract class AbstractReplicator implements Replicator {
 
 protected final BrokerService brokerService;
 protected final String localTopicName;
@@ -64,10 +70,31 @@ public abstract class AbstractReplicator {
 
 protected static final AtomicReferenceFieldUpdater STATE_UPDATER =
 AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, 
State.class, "state");
-private volatile State state = State.Stopped;
-
-protected enum State {
-Stopped, Starting, Started, Stopping
+@VisibleForTesting
+@Getter
+protected volatile State state = State.Disconnected;
+
+public enum State {
+/**
+ * This enum has two mean meanings:
+ *   Init: replicator is just created, has not been started now.
+ *   Disconnected: the producer was closed after {@link 
PersistentTopic#checkGC} called {@link #disconnect}.
+ */
+// The internal producer is disconnected.
+Disconnected,
+// Trying to create a new internal producer.
+Starting,
+// The internal producer has started, and tries copy data.
+Started,
+/**
+ * The producer is closing after {@link PersistentTopic#checkGC} 
called {@link #disconnect}.
+ */
+// The internal producer is trying to disconnect.
+Disconnecting,
+// The replicator is in terminating.
+Terminating,
+// The replicator is never used again. Pulsar will create a new 
Replicator when enable replication again.
+Terminated;
 }
 
 public AbstractReplicator(String localCluster, Topic localTopic, String 
remoteCluster, String remoteTopicName,
@@ -96,16 +123,16 @@ public abstract class AbstractReplicator {
 .sendTimeout(0, TimeUnit.SECONDS) //
 

Re: [PR] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner [pulsar]

2024-04-22 Thread via GitHub


poorbarcode merged PR #21946:
URL: https://github.com/apache/pulsar/pull/21946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]

2024-04-22 Thread via GitHub


massakam commented on PR #22556:
URL: https://github.com/apache/pulsar/pull/22556#issuecomment-2071211652

   @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]

2024-04-22 Thread via GitHub


codecov-commenter commented on PR #22556:
URL: https://github.com/apache/pulsar/pull/22556#issuecomment-2071211068

   ## 
[Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22556?dropdown=coverage=pr=h1_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 Report
   Attention: Patch coverage is `86.7%` with `2 lines` in your changes are 
missing coverage. Please review.
   > Project coverage is 73.91%. Comparing base 
[(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 to head 
[(`442bb30`)](https://app.codecov.io/gh/apache/pulsar/pull/22556?dropdown=coverage=pr=desc_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache).
   > Report is 177 commits behind head on master.
   
   Additional details and impacted files
   
   
   [![Impacted file tree 
graph](https://app.codecov.io/gh/apache/pulsar/pull/22556/graphs/tree.svg?width=650=150=pr=acYqCpsK9J_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22556?src=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   ```diff
   @@ Coverage Diff  @@
   ## master   #22556  +/-   ##
   
   + Coverage 73.57%   73.91%   +0.34% 
   - Complexity3262433055 +431 
   
 Files  1877 1885   +8 
 Lines139502   140365 +863 
 Branches  1529915420 +121 
   
   + Hits 102638   103753+1115 
   + Misses2890828569 -339 
   - Partials   7956 8043  +87 
   ```
   
   | 
[Flag](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr=flags_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[inttests](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `26.72% <80.00%> (+2.13%)` | :arrow_up: |
   | 
[systests](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `24.56% <40.00%> (+0.24%)` | :arrow_up: |
   | 
[unittests](https://app.codecov.io/gh/apache/pulsar/pull/22556/flags?src=pr=flag_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | `73.20% <86.66%> (+0.36%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click 
here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#carryforward-flags-in-the-pull-request-comment)
 to find out more.
   
   | 
[Files](https://app.codecov.io/gh/apache/pulsar/pull/22556?dropdown=coverage=pr=tree_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
 | Coverage Δ | |
   |---|---|---|
   | 
[...oker/service/nonpersistent/NonPersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22556?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fnonpersistent%2FNonPersistentTopic.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL25vbnBlcnNpc3RlbnQvTm9uUGVyc2lzdGVudFRvcGljLmphdmE=)
 | `71.21% <85.71%> (+1.75%)` | :arrow_up: |
   | 
[...sar/broker/service/persistent/PersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22556?src=pr=tree=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentTopic.java_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=)
 | `78.36% <87.50%> (-0.10%)` | :arrow_down: |
   
   ... and [257 files with indirect coverage 
changes](https://app.codecov.io/gh/apache/pulsar/pull/22556/indirect-changes?src=pr=tree-more_medium=referral_source=github_content=comment_campaign=pr+comments_term=apache)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][broker] PIP-264: Add topic messaging metrics [pulsar]

2024-04-22 Thread via GitHub


merlimat commented on PR #22467:
URL: https://github.com/apache/pulsar/pull/22467#issuecomment-2071140865

   @dragosvictor I think we shouldn't worry about v1 topic names, these were 
deprecated long ago we should actually start to get rid of them completely.
   
   > The topic name, as is currently filled in by the 
[client](https://github.com/apache/pulsar/blob/c72c135541e14043370836421cfef372b1d0a0ea/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java#L47)
 also includes the persistence part: 
pulsar.topic="persistent://my-property/use/my-ns/testAllCompactedOut-07b9ad7f-89cb-4800-88e8-cb3417cf0406".
   
   Yes, it's better to include, because it's part of fully-qualified name.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][broker] PIP-264: Add topic messaging metrics [pulsar]

2024-04-22 Thread via GitHub


dragosvictor commented on PR #22467:
URL: https://github.com/apache/pulsar/pull/22467#issuecomment-2071044457

   > Changes look good.
   > 
   > My only comment is that we should be using qualified names in the 
attributes:
   > 
   > > pulsar.namespace Includes the namespace portion only my-namespace
   > > pulsar.topic Includes the local topic name   my-topic
   > 
   > eg: `pulsar.namespace` -> `my-tenant/my-namespace` `pulsar.topic` -> 
`my-tenant/my-namespace/my-topic`
   > 
   > This is to keep consistency with many other places in APIs and CLI tools. 
Also, consistent with OTel metrics attributes in the client SDK.
   
   Note that such a transformation would lead the namespace to occasionally 
include the "cluster" portion in case of old topic names: 
`pulsar.namespace="my-property/use/my-ns"`
   
   The topic name, as is currently filled in by the 
[client](https://github.com/apache/pulsar/blob/c72c135541e14043370836421cfef372b1d0a0ea/pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/MetricsUtil.java#L47)
 also includes the persistence part: 
`pulsar.topic="persistent://my-property/use/my-ns/testAllCompactedOut-07b9ad7f-89cb-4800-88e8-cb3417cf0406"`.
   
   If we can confirm that this is the intent here, I can go ahead and proceed 
with this proposal. Alternatively, we can augment the existing implementation 
with one more attribute, say `pulsar.topic.complete.name`, even tough it leads 
to repetition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve] Update Oxia client to 0.1.6 (#22525)

2024-04-22 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new c72c135541e [improve] Update Oxia client to 0.1.6 (#22525)
c72c135541e is described below

commit c72c135541e14043370836421cfef372b1d0a0ea
Author: Matteo Merli 
AuthorDate: Mon Apr 22 14:15:36 2024 -0700

[improve] Update Oxia client to 0.1.6 (#22525)
---
 distribution/licenses/LICENSE-Reactive-gRPC.txt  | 29 
 distribution/server/src/assemble/LICENSE.bin.txt | 10 +++-
 pom.xml  |  3 +--
 pulsar-metadata/pom.xml  |  1 -
 4 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/distribution/licenses/LICENSE-Reactive-gRPC.txt 
b/distribution/licenses/LICENSE-Reactive-gRPC.txt
new file mode 100644
index 000..bc589401e7b
--- /dev/null
+++ b/distribution/licenses/LICENSE-Reactive-gRPC.txt
@@ -0,0 +1,29 @@
+BSD 3-Clause License
+
+Copyright (c) 2019, Salesforce.com, Inc.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+  this list of conditions and the following disclaimer in the documentation
+  and/or other materials provided with the distribution.
+
+* Neither the name of the copyright holder nor the names of its
+  contributors may be used to endorse or promote products derived from
+  this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 93fd46d44b5..c5642503b25 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -481,7 +481,12 @@ The Apache Software License, Version 2.0
   * Prometheus
 - io.prometheus-simpleclient_httpserver-0.16.0.jar
   * Oxia
-- io.streamnative.oxia-oxia-client-0.1.0-shaded.jar
+- io.streamnative.oxia-oxia-client-0.1.6.jar
+- io.streamnative.oxia-oxia-client-metrics-api-0.1.6.jar
+  * OpenHFT
+- net.openhft-zero-allocation-hashing-0.16.jar
+  * Project reactor
+- io.projectreactor-reactor-core-3.5.2.jar
   * Java JSON WebTokens
 - io.jsonwebtoken-jjwt-api-0.11.1.jar
 - io.jsonwebtoken-jjwt-impl-0.11.1.jar
@@ -548,6 +553,9 @@ BSD 3-clause "New" or "Revised" License
  * JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- 
../licenses/LICENSE-JSR305.txt
  * JLine -- jline-jline-2.14.6.jar -- ../licenses/LICENSE-JLine.txt
  * JLine3 -- org.jline-jline-3.21.0.jar -- ../licenses/LICENSE-JLine.txt
+ * Reactive gRPC
+- com.salesforce.servicelibs-reactive-grpc-common-1.2.4.jar -- 
../licenses/LICENSE-Reactive-gRPC.txt
+- com.salesforce.servicelibs-reactor-grpc-stub-1.2.4.jar -- 
../licenses/LICENSE-Reactive-gRPC.txt
 
 BSD 2-Clause License
  * HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- 
../licenses/LICENSE-HdrHistogram.txt
diff --git a/pom.xml b/pom.xml
index 168eddaf2fe..90b6c8cb8ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,7 +248,7 @@ flexible messaging model and an intuitive client 
API.
 4.5.13
 4.4.15
 0.7.5
-0.1.0
+0.1.6
 2.0
 1.10.12
 5.3.3
@@ -1193,7 +1193,6 @@ flexible messaging model and an intuitive client 
API.
 io.streamnative.oxia
 oxia-client
 ${oxia.version}
-shaded
   
   
 io.streamnative.oxia
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 8600d0ea191..163a3058dc4 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -65,7 +65,6 @@
 
   io.streamnative.oxia
   oxia-client
-  shaded
 
 
 



Re: [PR] [improve] Update Oxia client to 0.1.6 [pulsar]

2024-04-22 Thread via GitHub


merlimat merged PR #22525:
URL: https://github.com/apache/pulsar/pull/22525


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] Update Oxia client to 0.1.6 [pulsar]

2024-04-22 Thread via GitHub


merlimat commented on PR #22525:
URL: https://github.com/apache/pulsar/pull/22525#issuecomment-2070940582

   > ```
   > LockManagerTest.acquireLocks:66 » NoClassDefFound Could not initialize 
class io.streamnative.oxia.proto.ListResponse
   > ```
   
   This was due to protobuf version conflicts. Solved by merging with master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies [pulsar]

2024-04-22 Thread via GitHub


wallacepeng opened a new issue, #22559:
URL: https://github.com/apache/pulsar/issues/22559

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Read release policy
   
   - [X] I understand that unsupported versions don't get bug fixes. I will 
attempt to reproduce the issue on a supported version of Pulsar client and 
Pulsar broker.
   
   
   ### Version
   
   puslar 2.10.5
   
   ### Minimal reproduce step
   
   1. setup two bookkeeper clusters using helm charts 
  bookkeeper and bookkeeper1
   2. make bookkeeper as readonly 
   3. decommission bookkeeper till zero replica (as we are using kubernetes, 
scale down one node, autorecovery replicates the ledgers) 
   4. restart brokers. 
   5. broker ran into loop on health check 
   
   ### What did you expect to see?
   
   broker health check should continue to work
   
   ### What did you see instead?
   
   broker health check ran into loop 
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] broker healthcheck ran into loop after decommissioned a cluster of bookies [pulsar]

2024-04-22 Thread via GitHub


wallacepeng commented on issue #22559:
URL: https://github.com/apache/pulsar/issues/22559#issuecomment-2070891379

   https://github.com/apache/pulsar/assets/894641/f8879642-e3e2-40b1-b063-308e957e693a;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Argo CD unable to generate template with pulsar chart 3.3.0 [pulsar-helm-chart]

2024-04-22 Thread via GitHub


hpvd commented on issue #471:
URL: 
https://github.com/apache/pulsar-helm-chart/issues/471#issuecomment-2070823328

   3.4.0 version of the Apache Pulsar Helm chart is now available:
   https://github.com/apache/pulsar-helm-chart/releases/tag/pulsar-3.4.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Pulsar 3.2.2 correct Message send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message send 
to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

 public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }

Set values:

Device device = new Device(); 
   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);


Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

 public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }

Set values:

Device device = new Device(); 
   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);


Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`
 public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }`

Set values:
`Device device = new Device(); 
   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);`

Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);

Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`
 public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }`

Set values:
`Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);`

Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);

Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`
 public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }`

Set values:
`Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);`

Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`
   public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }`

Set values:
`
   Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);
`

Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`
   public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }`

Set values:
`
   Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);`

Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`
   public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }`

Set values:

   Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);



Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

   public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;
   }

Set values:

   Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);



Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

   public class Device  {

  public String measurement;
  public long timestamp;
  public Map tags;
  public Map fields;
   }

Set values:

   Device device = new Device();

   device.measurement = "test5";
   device.timestamp = Instant.now().toEpochMilli();
   device.tags = Maps.newHashMap();
   device.tags.put("foo", "bar");
   device.fields = Maps.newHashMap();
   device.fields.put("temp", 14.8);



Create producer:

   Producer producer = client.newProducer(JSONSchema.of(Device.class))

.topic("persistent://public/default/foobar")
.producerName("test")
.create();

Send message:

   MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;`
}


Set values:

Device device = new Device();

device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);

Send message:

MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

` public String measurement;
 public long timestamp;
 public Map tags;
 public Map fields;`

Set values:

`Device device = new Device();

device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);`


Send message:

`
MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();
`

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`public String measurement;
public long timestamp;
public Map tags;
public Map fields;`

Set values:

`Device device = new Device();

device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);`


Send message:

`
MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();
`

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`public` String measurement;
public long timestamp;
public Map tags;
public Map fields;`

Set values:

`Device device = new Device();

device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);`


Send message:

`
MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();
`

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

`public class Device  {

public String measurement;
public long timestamp;
public Map tags;
public Map fields;
 }`

Set values:

`Device device = new Device();

device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);`


Send message:

`
MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();
`

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

` 
public class Device  {

public String measurement;
public long timestamp;
public Map tags;
public Map fields;`
 }

Set values:

`
 Device device = new Device();

device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);
`


Send message:

`
MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();
`

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [D] Pulsar 3.2.2 correct Message Format send to InfluxDBv2 sink connector with Python Producer [pulsar]

2024-04-22 Thread via GitHub


GitHub user smuehlmann edited a discussion: Pulsar 3.2.2 correct Message Format 
send to InfluxDBv2 sink connector with Python Producer

Hi,

i have problems to send a message with a python producer which should be 
processed by an InfluxDBv2 sink connector.

With a java test code it is possible. 

Java class:

` public class Device  {
public String measurement;
public long timestamp;
public Map tags;
public Map fields;`

Set values:

`   Device device = new Device();
device.measurement = "test5";
device.timestamp = Instant.now().toEpochMilli();
device.tags = Maps.newHashMap();
device.tags.put("foo", "bar");
device.fields = Maps.newHashMap();
device.fields.put("temp", 14.8);
`


Send message:

`MessageId msgID = producer.newMessage()
.key("test")
.value(device)
.send();
`

Any suggestions how to do with Python ?

Thanks

GitHub link: https://github.com/apache/pulsar/discussions/22558


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


dragosvictor commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575129127


##
pulsar/internal/lookup_service.go:
##
@@ -30,8 +30,9 @@ import (
 
 // LookupResult encapsulates a struct for lookup a request, containing two 
parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
-   LogicalAddr  *url.URL
-   PhysicalAddr *url.URL
+   IsProxyThroughServiceURL bool

Review Comment:
   You're right, it's never used outside of `lookupService.GetBrokerAddress`, 
where it is retrieved and consumed at the same time. Removed, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][client] Add maxConnectionsPerHost and connectionMaxIdleSeconds to PulsarAdminBuilder [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on PR #22541:
URL: https://github.com/apache/pulsar/pull/22541#issuecomment-2070268832

   the `setMaxConnectionsPerHost` in async http client doesn't seem to behave 
as expected. Will check the errors
   ```
 Caused by: java.util.concurrent.CompletionException: 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: 
Could not complete the operation. Number of retries has been exhausted. Failed 
reason: Too many connections: 16
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at 
java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674)
at 
java.base/java.util.concurrent.CompletableFuture.orApplyStage(CompletableFuture.java:1601)
at 
java.base/java.util.concurrent.CompletableFuture.applyToEither(CompletableFuture.java:2261)
at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.retryOrTimeOut(AsyncHttpConnector.java:275)
at 
org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector.apply(AsyncHttpConnector.java:234)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]

2024-04-22 Thread via GitHub


massakam commented on code in PR #22556:
URL: https://github.com/apache/pulsar/pull/22556#discussion_r1575095343


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##
@@ -745,8 +746,7 @@ public void updateRates(NamespaceStats nsStats, 
NamespaceBundleStats bundleStats
 
 replicators.forEach((region, replicator) -> replicator.updateRates());
 
-nsStats.producerCount += producers.size();
-bundleStats.producerCount += producers.size();
+final AtomicInteger producerCount = new AtomicInteger();

Review Comment:
   Replaced `AtomicInteger` with `MutableInt`.



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -2107,8 +2108,7 @@ public void updateRates(NamespaceStats nsStats, 
NamespaceBundleStats bundleStats
 
 replicators.forEach((region, replicator) -> replicator.updateRates());
 
-nsStats.producerCount += producers.size();
-bundleStats.producerCount += producers.size();
+final AtomicInteger producerCount = new AtomicInteger();

Review Comment:
   Replaced `AtomicInteger` with `MutableInt`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


dragosvictor commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575070621


##
pulsar/internal/pulsar_proto/PulsarApi.pb.go:
##
@@ -18,8 +18,8 @@
 
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-// protoc-gen-go v1.28.1
-// protocv3.21.9
+// protoc-gen-go v1.31.0
+// protocv4.24.3

Review Comment:
   Done! Note that it really only changed the version comments, everything else 
stayed the same: 
https://github.com/apache/pulsar-client-go/pull/1208/commits/880d732ed54d774b539d09897f273e24e5b7608f.
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


dragosvictor commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1575072107


##
pulsar/producer_partition.go:
##
@@ -373,10 +390,14 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {
return b
 }
 
-func (p *partitionProducer) ConnectionClosed() {
+func (p *partitionProducer) ConnectionClosed(closeProducer 
*pb.CommandCloseProducer) {

Review Comment:
   Good catch, fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][cli] Add generate-completion command to admin and client [pulsar]

2024-04-22 Thread via GitHub


nodece opened a new pull request, #22557:
URL: https://github.com/apache/pulsar/pull/22557

   ### Motivation
   
   The tab-completion is a important feature, we migrate the CLI parser to 
picocli from jcommander, right now we can use the picocli autocomplete to do 
that.
   
   - pulsar-admin
   ```
   ❯ source <(bin/pulsar-admin generate-completion)
   ❯ bin/pulsar-admin topics clear-backlog --subscription
   --help  --subscription  --version   -h  -s   
   -v  
   ```
   
   - pulsar-client
   ```
   ❯ source <(bin/pulsar-client generate-completion)
   ❯ bin/pulsar-client produce -h
   --chunking --files--key-value-key
--rate -db-h
 -kvkf  -s   
   --disable-batching --help --key-value-key-file   
--separator-dr-k
 -m -v   
   --disable-replication  --key  --messages 
--value-schema -ekn   -ks   
 -n -vs  
   --encryption-key-name  --key-schema   --num-produce  
--version  -ekv   -kvet 
 -p  
   --encryption-key-value --key-value-encoding-type  --properties   
-c -f -kvk  
 -r  
   ```
   
   ### Modifications
   
   - Add `generate-completion` command to the `pulsar-admin` and `pulsar-client`
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][misc] Upgrade to Bookkeeper 4.17.0 (#22551)

2024-04-22 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a037fa33eee [improve][misc] Upgrade to Bookkeeper 4.17.0 (#22551)
a037fa33eee is described below

commit a037fa33a6b0bc052c4aa960a55ca8bd0ca2
Author: Lari Hotari 
AuthorDate: Mon Apr 22 19:38:11 2024 +0300

[improve][misc] Upgrade to Bookkeeper 4.17.0 (#22551)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 100 +++
 distribution/shell/src/assemble/LICENSE.bin.txt  |   8 +-
 pom.xml  |   6 +-
 3 files changed, 57 insertions(+), 57 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 4dc6e434167..93fd46d44b5 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -262,7 +262,7 @@ The Apache Software License, Version 2.0
  - com.fasterxml.jackson.module-jackson-module-parameter-names-2.14.2.jar
  * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
  * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
- * Proto Google Common Protos -- 
com.google.api.grpc-proto-google-common-protos-2.9.0.jar
+ * Proto Google Common Protos -- 
com.google.api.grpc-proto-google-common-protos-2.17.0.jar
  * Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
  * Gson
 - com.google.code.gson-gson-2.8.9.jar
@@ -356,34 +356,34 @@ The Apache Software License, Version 2.0
 - net.java.dev.jna-jna-jpms-5.12.1.jar
 - net.java.dev.jna-jna-platform-jpms-5.12.1.jar
  * BookKeeper
-- org.apache.bookkeeper-bookkeeper-common-4.16.5.jar
-- org.apache.bookkeeper-bookkeeper-common-allocator-4.16.5.jar
-- org.apache.bookkeeper-bookkeeper-proto-4.16.5.jar
-- org.apache.bookkeeper-bookkeeper-server-4.16.5.jar
-- org.apache.bookkeeper-bookkeeper-tools-framework-4.16.5.jar
-- org.apache.bookkeeper-circe-checksum-4.16.5.jar
-- org.apache.bookkeeper-cpu-affinity-4.16.5.jar
-- org.apache.bookkeeper-statelib-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-api-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-common-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-java-client-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-java-client-base-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-proto-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-server-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-service-api-4.16.5.jar
-- org.apache.bookkeeper-stream-storage-service-impl-4.16.5.jar
-- org.apache.bookkeeper.http-http-server-4.16.5.jar
-- org.apache.bookkeeper.http-vertx-http-server-4.16.5.jar
-- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5.jar
-- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.16.5.jar
-- org.apache.distributedlog-distributedlog-common-4.16.5.jar
-- org.apache.distributedlog-distributedlog-core-4.16.5-tests.jar
-- org.apache.distributedlog-distributedlog-core-4.16.5.jar
-- org.apache.distributedlog-distributedlog-protocol-4.16.5.jar
-- org.apache.bookkeeper.stats-codahale-metrics-provider-4.16.5.jar
-- org.apache.bookkeeper-bookkeeper-slogger-api-4.16.5.jar
-- org.apache.bookkeeper-bookkeeper-slogger-slf4j-4.16.5.jar
-- org.apache.bookkeeper-native-io-4.16.5.jar
+- org.apache.bookkeeper-bookkeeper-common-4.17.0.jar
+- org.apache.bookkeeper-bookkeeper-common-allocator-4.17.0.jar
+- org.apache.bookkeeper-bookkeeper-proto-4.17.0.jar
+- org.apache.bookkeeper-bookkeeper-server-4.17.0.jar
+- org.apache.bookkeeper-bookkeeper-tools-framework-4.17.0.jar
+- org.apache.bookkeeper-circe-checksum-4.17.0.jar
+- org.apache.bookkeeper-cpu-affinity-4.17.0.jar
+- org.apache.bookkeeper-statelib-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-api-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-common-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-java-client-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-java-client-base-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-proto-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-server-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-service-api-4.17.0.jar
+- org.apache.bookkeeper-stream-storage-service-impl-4.17.0.jar
+- org.apache.bookkeeper.http-http-server-4.17.0.jar
+- org.apache.bookkeeper.http-vertx-http-server-4.17.0.jar
+- org.apache.bookkeeper.stats-bookkeeper-stats-api-4.17.0.jar
+- org.apache.bookkeeper.stats-prometheus-metrics-provider-4.17.0.jar
+- org.apache.distributedlog-distributedlog-common-4.17.0.jar
+- org.apache.distributedlog-distributedlog-core-4.17.0-tests.jar
+- org.apache.distributedlog-distributedlog-core-4.17.0.jar
+- 

Re: [PR] [improve][misc] Upgrade to Bookkeeper 4.17.0 [pulsar]

2024-04-22 Thread via GitHub


merlimat merged PR #22551:
URL: https://github.com/apache/pulsar/pull/22551


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on code in PR #22556:
URL: https://github.com/apache/pulsar/pull/22556#discussion_r1575054920


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##
@@ -2107,8 +2108,7 @@ public void updateRates(NamespaceStats nsStats, 
NamespaceBundleStats bundleStats
 
 replicators.forEach((region, replicator) -> replicator.updateRates());
 
-nsStats.producerCount += producers.size();
-bundleStats.producerCount += producers.size();
+final AtomicInteger producerCount = new AtomicInteger();

Review Comment:
   I guess this could be a `org.apache.commons.lang3.mutable.MutableInt` since 
this is used in single threaded code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Exclude producers for geo-replication from publishers field of topic stats [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on code in PR #22556:
URL: https://github.com/apache/pulsar/pull/22556#discussion_r1575054611


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java:
##
@@ -745,8 +746,7 @@ public void updateRates(NamespaceStats nsStats, 
NamespaceBundleStats bundleStats
 
 replicators.forEach((region, replicator) -> replicator.updateRates());
 
-nsStats.producerCount += producers.size();
-bundleStats.producerCount += producers.size();
+final AtomicInteger producerCount = new AtomicInteger();

Review Comment:
   I guess this could be a `org.apache.commons.lang3.mutable.MutableInt` since 
this is used in single threaded code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated (a3122b106fc -> 01671b16417)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from a3122b106fc [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
 new 83a243c4df4 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0)
 new 01671b16417 [fix][offload] Increase file upload limit from 2048MiB to 
4096MiB for GCP/GCS offloading (#22554)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf   | 11 +--
 jclouds-shaded/pom.xml | 78 --
 pom.xml|  2 +-
 .../common/policies/data/OffloadPoliciesImpl.java  |  7 +-
 4 files changed, 70 insertions(+), 28 deletions(-)



(pulsar) 02/02: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 01671b16417f553a6bf51257ed74202656339d83
Author: Lari Hotari 
AuthorDate: Mon Apr 22 18:13:45 2024 +0300

[fix][offload] Increase file upload limit from 2048MiB to 4096MiB for 
GCP/GCS offloading (#22554)

(cherry picked from commit e81f37000ec212676c5daffa17faad8fc604ff77)
---
 conf/broker.conf  | 11 ++-
 .../pulsar/common/policies/data/OffloadPoliciesImpl.java  |  7 ---
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2d5221c65c4..dd0f3e49e1f 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1665,10 +1665,10 @@ s3ManagedLedgerOffloadBucket=
 # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for 
testing)
 s3ManagedLedgerOffloadServiceEndpoint=
 
-# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB 
minimum)
+# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 
5MiB minimum)
 s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 
-# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default)
 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage ledger offload, region where offload bucket is 
located.
@@ -1678,10 +1678,11 @@ gcsManagedLedgerOffloadRegion=
 # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger 
into
 gcsManagedLedgerOffloadBucket=
 
-# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by 
default, 5MB minimum)
-gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by 
default, 5MiB minimum)
+# Since JClouds limits the maximum number of blocks to 32, the maximum size of 
a ledger is 32 times the block size.
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728
 
-# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by 
default)
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by 
default)
 gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage, path to json file containing service account 
credentials.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 51e181811c2..6c40aa3f2ed 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 }
 }
 
-public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MB
-public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
+public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MiB
+public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 
1024;   // 128MiB
+public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MiB
 public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
 public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
 public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
@@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 private String gcsManagedLedgerOffloadBucket = null;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
-private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
+private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
 private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = 
DEFAULT_READ_BUFFER_SIZE_IN_BYTES;



(pulsar) 01/02: [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#22220)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 83a243c4df4f5a8492d0fa3850f14ab3db1051a5
Author: Paul Gier 
AuthorDate: Fri Mar 15 09:46:33 2024 -0500

[fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0)

Co-authored-by: 道君 
Co-authored-by: Lari Hotari 
(cherry picked from commit 73dc213d4cec3513a1addbcb3518f441093c57ec)
---
 jclouds-shaded/pom.xml | 78 ++
 pom.xml|  2 +-
 2 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
index 8c71222092b..92a3c4fee81 100644
--- a/jclouds-shaded/pom.xml
+++ b/jclouds-shaded/pom.xml
@@ -33,8 +33,17 @@
   jclouds-shaded
   Apache Pulsar :: Jclouds shaded
 
+  
+
+2.10.1
+32.0.0-jre
+7.0.0
+2.0.1
+3.0.0
+2.0.0
+  
   
-
 
   org.apache.jclouds
   jclouds-allblobstore
@@ -61,12 +70,48 @@
   jclouds-slf4j
   ${jclouds.version}
 
-
-  javax.annotation
-  javax.annotation-api
-
   
 
+  
+
+  
+com.google.code.gson
+gson
+${gson.version}
+  
+  
+com.google.guava
+guava
+${guava.version}
+  
+  
+com.google.inject
+guice
+${guice.version}
+  
+  
+com.google.inject.extensions
+guice-assistedinject
+${guice.version}
+  
+  
+jakarta.inject
+jakarta.inject-api
+${jakarta.inject.api.version}
+  
+  
+jakarta.ws.rs
+jakarta.ws.rs-api
+${jakarta.ws.rs-api.version}
+  
+  
+jakarta.annotation
+jakarta.annotation-api
+${jakarta.annotation-api.version}
+  
+
+  
+
   
 
   
@@ -97,13 +142,13 @@
   
com.google.inject.extensions:guice-multibindings
   com.google.code.gson:gson
   org.apache.httpcomponents:*
-  javax.ws.rs:*
   com.jamesmurty.utils:*
   net.iharder:*
   aopalliance:*
-  javax.inject:*
-  javax.annotation:*
   com.google.errorprone:*
+  jakarta.inject:jakarta.inject-api
+  jakarta.annotation:jakarta.annotation-api
+  jakarta.ws.rs:jakarta.ws.rs-api
 
   
 
@@ -112,10 +157,6 @@
   com.google
   
org.apache.pulsar.jcloud.shade.com.google
 
-
-  javax.ws
-  
org.apache.pulsar.jcloud.shade.javax.ws
-
 
   com.jamesmurty.utils
   
org.apache.pulsar.jcloud.shade.com.jamesmurty.utils
@@ -129,18 +170,17 @@
   
org.apache.pulsar.jcloud.shade.net.iharder
 
 
-  javax.inject
-  
org.apache.pulsar.jcloud.shade.javax.inject
+  com.google.errorprone
+  
org.apache.pulsar.jcloud.shade.com.google.errorprone
 
 
-  javax.annotation
-  
org.apache.pulsar.jcloud.shade.javax.annotation
+  jakarta
+  
org.apache.pulsar.jcloud.shade.jakarta
 
 
-  com.google.errorprone
-  
org.apache.pulsar.jcloud.shade.com.google.errorprone
+  org.aopalliance
+  
org.apache.pulsar.jcloud.shade.org.aopalliance
 
-
   
   
 
diff --git a/pom.xml b/pom.xml
index bd41ebbbed9..8aa8bf36c98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -185,7 +185,7 @@ flexible messaging model and an intuitive client 
API.
 1.12.262
 1.11.3
 2.10.10
-2.5.0
+2.6.0
 5.1.0
 3.42.0.0
 8.0.11



(pulsar) 02/02: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1c8fd73b9c2c72cd91297723d6681dcaff82d8e7
Author: Lari Hotari 
AuthorDate: Mon Apr 22 18:13:45 2024 +0300

[fix][offload] Increase file upload limit from 2048MiB to 4096MiB for 
GCP/GCS offloading (#22554)

(cherry picked from commit e81f37000ec212676c5daffa17faad8fc604ff77)
---
 conf/broker.conf  | 11 ++-
 .../pulsar/common/policies/data/OffloadPoliciesImpl.java  |  7 ---
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 2dbcff12b16..74be6803fe2 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1641,10 +1641,10 @@ s3ManagedLedgerOffloadBucket=
 # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for 
testing)
 s3ManagedLedgerOffloadServiceEndpoint=
 
-# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB 
minimum)
+# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 
5MiB minimum)
 s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 
-# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default)
 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage ledger offload, region where offload bucket is 
located.
@@ -1654,10 +1654,11 @@ gcsManagedLedgerOffloadRegion=
 # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger 
into
 gcsManagedLedgerOffloadBucket=
 
-# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by 
default, 5MB minimum)
-gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by 
default, 5MiB minimum)
+# Since JClouds limits the maximum number of blocks to 32, the maximum size of 
a ledger is 32 times the block size.
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728
 
-# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by 
default)
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by 
default)
 gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage, path to json file containing service account 
credentials.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 51e181811c2..6c40aa3f2ed 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 }
 }
 
-public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MB
-public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
+public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MiB
+public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 
1024;   // 128MiB
+public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MiB
 public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
 public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
 public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
@@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 private String gcsManagedLedgerOffloadBucket = null;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
-private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
+private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
 private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = 
DEFAULT_READ_BUFFER_SIZE_IN_BYTES;



(pulsar) 01/02: [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#22220)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 48ff241a45d795d9dbd64aa9377c42f67219ecaa
Author: Paul Gier 
AuthorDate: Fri Mar 15 09:46:33 2024 -0500

[fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0)

Co-authored-by: 道君 
Co-authored-by: Lari Hotari 
(cherry picked from commit 73dc213d4cec3513a1addbcb3518f441093c57ec)
---
 jclouds-shaded/pom.xml | 78 ++
 pom.xml|  2 +-
 2 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
index 8ee11030b6f..d39071bb6a5 100644
--- a/jclouds-shaded/pom.xml
+++ b/jclouds-shaded/pom.xml
@@ -33,8 +33,17 @@
   jclouds-shaded
   Apache Pulsar :: Jclouds shaded
 
+  
+
+2.10.1
+32.0.0-jre
+7.0.0
+2.0.1
+3.0.0
+2.0.0
+  
   
-
 
   org.apache.jclouds
   jclouds-allblobstore
@@ -61,12 +70,48 @@
   jclouds-slf4j
   ${jclouds.version}
 
-
-  javax.annotation
-  javax.annotation-api
-
   
 
+  
+
+  
+com.google.code.gson
+gson
+${gson.version}
+  
+  
+com.google.guava
+guava
+${guava.version}
+  
+  
+com.google.inject
+guice
+${guice.version}
+  
+  
+com.google.inject.extensions
+guice-assistedinject
+${guice.version}
+  
+  
+jakarta.inject
+jakarta.inject-api
+${jakarta.inject.api.version}
+  
+  
+jakarta.ws.rs
+jakarta.ws.rs-api
+${jakarta.ws.rs-api.version}
+  
+  
+jakarta.annotation
+jakarta.annotation-api
+${jakarta.annotation-api.version}
+  
+
+  
+
   
 
   
@@ -97,13 +142,13 @@
   
com.google.inject.extensions:guice-multibindings
   com.google.code.gson:gson
   org.apache.httpcomponents:*
-  javax.ws.rs:*
   com.jamesmurty.utils:*
   net.iharder:*
   aopalliance:*
-  javax.inject:*
-  javax.annotation:*
   com.google.errorprone:*
+  jakarta.inject:jakarta.inject-api
+  jakarta.annotation:jakarta.annotation-api
+  jakarta.ws.rs:jakarta.ws.rs-api
 
   
 
@@ -112,10 +157,6 @@
   com.google
   
org.apache.pulsar.jcloud.shade.com.google
 
-
-  javax.ws
-  
org.apache.pulsar.jcloud.shade.javax.ws
-
 
   com.jamesmurty.utils
   
org.apache.pulsar.jcloud.shade.com.jamesmurty.utils
@@ -129,18 +170,17 @@
   
org.apache.pulsar.jcloud.shade.net.iharder
 
 
-  javax.inject
-  
org.apache.pulsar.jcloud.shade.javax.inject
+  com.google.errorprone
+  
org.apache.pulsar.jcloud.shade.com.google.errorprone
 
 
-  javax.annotation
-  
org.apache.pulsar.jcloud.shade.javax.annotation
+  jakarta
+  
org.apache.pulsar.jcloud.shade.jakarta
 
 
-  com.google.errorprone
-  
org.apache.pulsar.jcloud.shade.com.google.errorprone
+  org.aopalliance
+  
org.apache.pulsar.jcloud.shade.org.aopalliance
 
-
   
   
 
diff --git a/pom.xml b/pom.xml
index 8a0ed47fb8d..b4b68b99031 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,7 @@ flexible messaging model and an intuitive client 
API.
 1.12.262
 1.11.3
 2.10.10
-2.5.0
+2.6.0
 5.1.0
 3.42.0.0
 8.0.11



(pulsar) branch branch-3.1 updated (6be74f1adaf -> 1c8fd73b9c2)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 6be74f1adaf [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
 new 48ff241a45d [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0)
 new 1c8fd73b9c2 [fix][offload] Increase file upload limit from 2048MiB to 
4096MiB for GCP/GCS offloading (#22554)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf   | 11 +--
 jclouds-shaded/pom.xml | 78 --
 pom.xml|  2 +-
 .../common/policies/data/OffloadPoliciesImpl.java  |  7 +-
 4 files changed, 70 insertions(+), 28 deletions(-)



(pulsar) branch branch-3.0 updated (def695b18b2 -> 5c09c20b33c)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from def695b18b2 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
 new fc136e8fe11 [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0)
 new 5c09c20b33c [fix][offload] Increase file upload limit from 2048MiB to 
4096MiB for GCP/GCS offloading (#22554)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf   | 11 +--
 jclouds-shaded/pom.xml | 78 --
 pom.xml|  2 +-
 .../common/policies/data/OffloadPoliciesImpl.java  |  7 +-
 4 files changed, 70 insertions(+), 28 deletions(-)



(pulsar) 01/02: [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#22220)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fc136e8fe11e3a16477ca92784debe592c1d8568
Author: Paul Gier 
AuthorDate: Fri Mar 15 09:46:33 2024 -0500

[fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 (#0)

Co-authored-by: 道君 
Co-authored-by: Lari Hotari 
(cherry picked from commit 73dc213d4cec3513a1addbcb3518f441093c57ec)
---
 jclouds-shaded/pom.xml | 78 ++
 pom.xml|  2 +-
 2 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/jclouds-shaded/pom.xml b/jclouds-shaded/pom.xml
index 2bf295e6261..58d3da73a1e 100644
--- a/jclouds-shaded/pom.xml
+++ b/jclouds-shaded/pom.xml
@@ -33,8 +33,17 @@
   jclouds-shaded
   Apache Pulsar :: Jclouds shaded
 
+  
+
+2.10.1
+32.0.0-jre
+7.0.0
+2.0.1
+3.0.0
+2.0.0
+  
   
-
 
   org.apache.jclouds
   jclouds-allblobstore
@@ -61,12 +70,48 @@
   jclouds-slf4j
   ${jclouds.version}
 
-
-  javax.annotation
-  javax.annotation-api
-
   
 
+  
+
+  
+com.google.code.gson
+gson
+${gson.version}
+  
+  
+com.google.guava
+guava
+${guava.version}
+  
+  
+com.google.inject
+guice
+${guice.version}
+  
+  
+com.google.inject.extensions
+guice-assistedinject
+${guice.version}
+  
+  
+jakarta.inject
+jakarta.inject-api
+${jakarta.inject.api.version}
+  
+  
+jakarta.ws.rs
+jakarta.ws.rs-api
+${jakarta.ws.rs-api.version}
+  
+  
+jakarta.annotation
+jakarta.annotation-api
+${jakarta.annotation-api.version}
+  
+
+  
+
   
 
   
@@ -97,13 +142,13 @@
   
com.google.inject.extensions:guice-multibindings
   com.google.code.gson:gson
   org.apache.httpcomponents:*
-  javax.ws.rs:*
   com.jamesmurty.utils:*
   net.iharder:*
   aopalliance:*
-  javax.inject:*
-  javax.annotation:*
   com.google.errorprone:*
+  jakarta.inject:jakarta.inject-api
+  jakarta.annotation:jakarta.annotation-api
+  jakarta.ws.rs:jakarta.ws.rs-api
 
   
 
@@ -112,10 +157,6 @@
   com.google
   
org.apache.pulsar.jcloud.shade.com.google
 
-
-  javax.ws
-  
org.apache.pulsar.jcloud.shade.javax.ws
-
 
   com.jamesmurty.utils
   
org.apache.pulsar.jcloud.shade.com.jamesmurty.utils
@@ -129,18 +170,17 @@
   
org.apache.pulsar.jcloud.shade.net.iharder
 
 
-  javax.inject
-  
org.apache.pulsar.jcloud.shade.javax.inject
+  com.google.errorprone
+  
org.apache.pulsar.jcloud.shade.com.google.errorprone
 
 
-  javax.annotation
-  
org.apache.pulsar.jcloud.shade.javax.annotation
+  jakarta
+  
org.apache.pulsar.jcloud.shade.jakarta
 
 
-  com.google.errorprone
-  
org.apache.pulsar.jcloud.shade.com.google.errorprone
+  org.aopalliance
+  
org.apache.pulsar.jcloud.shade.org.aopalliance
 
-
   
   
 
diff --git a/pom.xml b/pom.xml
index ab609f9feee..fbae234e855 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,7 +184,7 @@ flexible messaging model and an intuitive client 
API.
 1.12.262
 1.11.3
 2.10.10
-2.5.0
+2.6.0
 5.1.0
 3.42.0.0
 8.0.11



(pulsar) 02/02: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5c09c20b33c8e9563df82b4f510fe1d7b519f33c
Author: Lari Hotari 
AuthorDate: Mon Apr 22 18:13:45 2024 +0300

[fix][offload] Increase file upload limit from 2048MiB to 4096MiB for 
GCP/GCS offloading (#22554)

(cherry picked from commit e81f37000ec212676c5daffa17faad8fc604ff77)
---
 conf/broker.conf  | 11 ++-
 .../pulsar/common/policies/data/OffloadPoliciesImpl.java  |  7 ---
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index ec0974dca20..34f7ab017e9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1657,10 +1657,10 @@ s3ManagedLedgerOffloadBucket=
 # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for 
testing)
 s3ManagedLedgerOffloadServiceEndpoint=
 
-# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB 
minimum)
+# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 
5MiB minimum)
 s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 
-# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default)
 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage ledger offload, region where offload bucket is 
located.
@@ -1670,10 +1670,11 @@ gcsManagedLedgerOffloadRegion=
 # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger 
into
 gcsManagedLedgerOffloadBucket=
 
-# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by 
default, 5MB minimum)
-gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by 
default, 5MiB minimum)
+# Since JClouds limits the maximum number of blocks to 32, the maximum size of 
a ledger is 32 times the block size.
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728
 
-# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by 
default)
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by 
default)
 gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage, path to json file containing service account 
credentials.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 51e181811c2..6c40aa3f2ed 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 }
 }
 
-public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MB
-public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
+public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MiB
+public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 
1024;   // 128MiB
+public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MiB
 public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
 public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
 public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
@@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 private String gcsManagedLedgerOffloadBucket = null;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
-private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
+private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
 private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = 
DEFAULT_READ_BUFFER_SIZE_IN_BYTES;



(pulsar) branch branch-3.2 updated: [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new a3122b106fc [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
a3122b106fc is described below

commit a3122b106fc227be218af9b611626fcbfb01a5e4
Author: Cong Zhao 
AuthorDate: Tue Apr 23 00:05:41 2024 +0800

[fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a 
terminated managed ledger (#22552)

(cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1274b347263..bd74629e605 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3700,7 +3700,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 Long nextLedgerId = 
ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
 // This means it has jumped to the last position
 if (nextLedgerId == null) {
-if (currentLedgerEntries == 0) {
+if (currentLedgerEntries == 0 && currentLedger != null) {
 return PositionImpl.get(currentLedger.getId(), 0);
 }
 return lastConfirmedEntry.getNext();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c9bd64171c1..4e3f8b79084 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
 }
 
+@Test
+public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+String mlName = "my_test_ledger";
+String cursorName = "c1";
+
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+// Write some data.
+Position p0 = ledger.addEntry("entry-0".getBytes());
+Position p1 = ledger.addEntry("entry-1".getBytes());
+
+// Read message.
+List entries = c1.readEntries(2);
+assertEquals(entries.size(), 2);
+assertEquals(entries.get(0).getPosition(), p0);
+assertEquals(entries.get(1).getPosition(), p1);
+entries.forEach(Entry::release);
+
+// Mark delete the last message.
+c1.markDelete(p1);
+Position markDeletedPosition = c1.getMarkDeletedPosition();
+Assert.assertEquals(markDeletedPosition, p1);
+
+// Terminate the managed ledger.
+Position lastPosition = ledger.terminate();
+assertEquals(lastPosition, p1);
+
+// Close the ledger.
+ledger.close();
+
+// Reopen the ledger.
+ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+BookKeeper mockBookKeeper = mock(BookKeeper.class);
+final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+cursorName);
+
+CompletableFuture recoverFuture = new CompletableFuture<>();
+// Recover the cursor.
+cursor.recover(new VoidCallback() {
+@Override
+public void operationComplete() {
+recoverFuture.complete(null);
+}
+
+@Override
+public void operationFailed(ManagedLedgerException exception) {
+recoverFuture.completeExceptionally(exception);
+}
+});
+
+recoverFuture.join();
+assertTrue(recoverFuture.isDone());
+assertFalse(recoverFuture.isCompletedExceptionally());
+
+// Verify the cursor state.
+assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition);
+assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }



(pulsar) branch branch-3.0 updated: [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new def695b18b2 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
def695b18b2 is described below

commit def695b18b270faf4e159b060ee75dbc4e699744
Author: Cong Zhao 
AuthorDate: Tue Apr 23 00:05:41 2024 +0800

[fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a 
terminated managed ledger (#22552)

(cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d9891721383..8415fdcede1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3701,7 +3701,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 Long nextLedgerId = 
ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
 // This means it has jumped to the last position
 if (nextLedgerId == null) {
-if (currentLedgerEntries == 0) {
+if (currentLedgerEntries == 0 && currentLedger != null) {
 return PositionImpl.get(currentLedger.getId(), 0);
 }
 return lastConfirmedEntry.getNext();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c9bd64171c1..4e3f8b79084 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
 }
 
+@Test
+public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+String mlName = "my_test_ledger";
+String cursorName = "c1";
+
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+// Write some data.
+Position p0 = ledger.addEntry("entry-0".getBytes());
+Position p1 = ledger.addEntry("entry-1".getBytes());
+
+// Read message.
+List entries = c1.readEntries(2);
+assertEquals(entries.size(), 2);
+assertEquals(entries.get(0).getPosition(), p0);
+assertEquals(entries.get(1).getPosition(), p1);
+entries.forEach(Entry::release);
+
+// Mark delete the last message.
+c1.markDelete(p1);
+Position markDeletedPosition = c1.getMarkDeletedPosition();
+Assert.assertEquals(markDeletedPosition, p1);
+
+// Terminate the managed ledger.
+Position lastPosition = ledger.terminate();
+assertEquals(lastPosition, p1);
+
+// Close the ledger.
+ledger.close();
+
+// Reopen the ledger.
+ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+BookKeeper mockBookKeeper = mock(BookKeeper.class);
+final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+cursorName);
+
+CompletableFuture recoverFuture = new CompletableFuture<>();
+// Recover the cursor.
+cursor.recover(new VoidCallback() {
+@Override
+public void operationComplete() {
+recoverFuture.complete(null);
+}
+
+@Override
+public void operationFailed(ManagedLedgerException exception) {
+recoverFuture.completeExceptionally(exception);
+}
+});
+
+recoverFuture.join();
+assertTrue(recoverFuture.isDone());
+assertFalse(recoverFuture.isCompletedExceptionally());
+
+// Verify the cursor state.
+assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition);
+assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }



(pulsar) branch branch-3.1 updated: [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger (#22552)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 6be74f1adaf [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)
6be74f1adaf is described below

commit 6be74f1adaf1b6d75a5072ea764cf376fdb02694
Author: Cong Zhao 
AuthorDate: Tue Apr 23 00:05:41 2024 +0800

[fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a 
terminated managed ledger (#22552)

(cherry picked from commit 35599b7325347838203a92ca63b78d134b7864c2)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a9605830de..d36b85aa10a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3646,7 +3646,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 Long nextLedgerId = 
ledgers.ceilingKey(skippedPosition.getLedgerId() + 1);
 // This means it has jumped to the last position
 if (nextLedgerId == null) {
-if (currentLedgerEntries == 0) {
+if (currentLedgerEntries == 0 && currentLedger != null) {
 return PositionImpl.get(currentLedger.getId(), 0);
 }
 return lastConfirmedEntry.getNext();
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c9bd64171c1..4e3f8b79084 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -4695,5 +4695,66 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
 }
 
+@Test
+public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+String mlName = "my_test_ledger";
+String cursorName = "c1";
+
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+// Write some data.
+Position p0 = ledger.addEntry("entry-0".getBytes());
+Position p1 = ledger.addEntry("entry-1".getBytes());
+
+// Read message.
+List entries = c1.readEntries(2);
+assertEquals(entries.size(), 2);
+assertEquals(entries.get(0).getPosition(), p0);
+assertEquals(entries.get(1).getPosition(), p1);
+entries.forEach(Entry::release);
+
+// Mark delete the last message.
+c1.markDelete(p1);
+Position markDeletedPosition = c1.getMarkDeletedPosition();
+Assert.assertEquals(markDeletedPosition, p1);
+
+// Terminate the managed ledger.
+Position lastPosition = ledger.terminate();
+assertEquals(lastPosition, p1);
+
+// Close the ledger.
+ledger.close();
+
+// Reopen the ledger.
+ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+BookKeeper mockBookKeeper = mock(BookKeeper.class);
+final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+cursorName);
+
+CompletableFuture recoverFuture = new CompletableFuture<>();
+// Recover the cursor.
+cursor.recover(new VoidCallback() {
+@Override
+public void operationComplete() {
+recoverFuture.complete(null);
+}
+
+@Override
+public void operationFailed(ManagedLedgerException exception) {
+recoverFuture.completeExceptionally(exception);
+}
+});
+
+recoverFuture.join();
+assertTrue(recoverFuture.isDone());
+assertFalse(recoverFuture.isCompletedExceptionally());
+
+// Verify the cursor state.
+assertEquals(cursor.getMarkDeletedPosition(), markDeletedPosition);
+assertEquals(cursor.getReadPosition(), markDeletedPosition.getNext());
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(ManagedCursorTest.class);
 }



Re: [PR] [fix][broker] Create new ledger after the current ledger is closed [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on PR #22034:
URL: https://github.com/apache/pulsar/pull/22034#issuecomment-2070058523

   When cherry-picking, it's important to also pick #22552


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-go) branch dependabot/go_modules/golang.org/x/net-0.23.0 deleted (was 43818365)

2024-04-22 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch 
dependabot/go_modules/golang.org/x/net-0.23.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


 was 43818365 chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(pulsar-client-go) branch master updated (b4d45cd3 -> 458defe3)

2024-04-22 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


from b4d45cd3 [Improve] Add admin api GetLeaderBroker (#1203)
 add 458defe3 chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 
(#1209)

No new revisions were added by this update.

Summary of changes:
 go.mod |  8 
 go.sum | 16 
 2 files changed, 12 insertions(+), 12 deletions(-)



Re: [PR] chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 [pulsar-client-go]

2024-04-22 Thread via GitHub


merlimat merged PR #1209:
URL: https://github.com/apache/pulsar-client-go/pull/1209


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated (e81f37000ec -> 35599b73253)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from e81f37000ec [fix][offload] Increase file upload limit from 2048MiB to 
4096MiB for GCP/GCS offloading (#22554)
 add 35599b73253 [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries 
when recovering a terminated managed ledger (#22552)

No new revisions were added by this update.

Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 61 ++
 2 files changed, 62 insertions(+), 1 deletion(-)



Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]

2024-04-22 Thread via GitHub


lhotari merged PR #22552:
URL: https://github.com/apache/pulsar/pull/22552


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading (#22554)

2024-04-22 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e81f37000ec [fix][offload] Increase file upload limit from 2048MiB to 
4096MiB for GCP/GCS offloading (#22554)
e81f37000ec is described below

commit e81f37000ec212676c5daffa17faad8fc604ff77
Author: Lari Hotari 
AuthorDate: Mon Apr 22 18:13:45 2024 +0300

[fix][offload] Increase file upload limit from 2048MiB to 4096MiB for 
GCP/GCS offloading (#22554)
---
 conf/broker.conf  | 11 ++-
 .../pulsar/common/policies/data/OffloadPoliciesImpl.java  |  7 ---
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d482f77da7c..d97e3a5ef89 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1670,10 +1670,10 @@ s3ManagedLedgerOffloadBucket=
 # For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for 
testing)
 s3ManagedLedgerOffloadServiceEndpoint=
 
-# For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB 
minimum)
+# For Amazon S3 ledger offload, Max block size in bytes. (64MiB by default, 
5MiB minimum)
 s3ManagedLedgerOffloadMaxBlockSizeInBytes=67108864
 
-# For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default)
+# For Amazon S3 ledger offload, Read buffer size in bytes (1MiB by default)
 s3ManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage ledger offload, region where offload bucket is 
located.
@@ -1683,10 +1683,11 @@ gcsManagedLedgerOffloadRegion=
 # For Google Cloud Storage ledger offload, Bucket to place offloaded ledger 
into
 gcsManagedLedgerOffloadBucket=
 
-# For Google Cloud Storage ledger offload, Max block size in bytes. (64MB by 
default, 5MB minimum)
-gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864
+# For Google Cloud Storage ledger offload, Max block size in bytes. (128MiB by 
default, 5MiB minimum)
+# Since JClouds limits the maximum number of blocks to 32, the maximum size of 
a ledger is 32 times the block size.
+gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728
 
-# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MB by 
default)
+# For Google Cloud Storage ledger offload, Read buffer size in bytes (1MiB by 
default)
 gcsManagedLedgerOffloadReadBufferSizeInBytes=1048576
 
 # For Google Cloud Storage, path to json file containing service account 
credentials.
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 51e181811c2..6c40aa3f2ed 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -79,8 +79,9 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 }
 }
 
-public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MB
-public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MB
+public static final int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 
1024;   // 64MiB
+public static final int DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES = 128 * 1024 * 
1024;   // 128MiB
+public static final int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024;   
   // 1MiB
 public static final int DEFAULT_OFFLOAD_MAX_THREADS = 2;
 public static final int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1;
 public static final String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders";
@@ -163,7 +164,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
 private String gcsManagedLedgerOffloadBucket = null;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
-private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_MAX_BLOCK_SIZE_IN_BYTES;
+private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = 
DEFAULT_GCS_MAX_BLOCK_SIZE_IN_BYTES;
 @Configuration
 @JsonProperty(access = JsonProperty.Access.READ_WRITE)
 private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = 
DEFAULT_READ_BUFFER_SIZE_IN_BYTES;



Re: [I] Offloading large ledgers (>2GB) fail with Google Cloud Storage [pulsar]

2024-04-22 Thread via GitHub


lhotari closed issue #15159: Offloading large ledgers (>2GB) fail with Google 
Cloud Storage
URL: https://github.com/apache/pulsar/issues/15159


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][offload] Increase file upload limit from 2048MiB to 4096MiB for GCP/GCS offloading [pulsar]

2024-04-22 Thread via GitHub


lhotari merged PR #22554:
URL: https://github.com/apache/pulsar/pull/22554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]

2024-04-22 Thread via GitHub


nikhil-ctds commented on PR #0:
URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069791969

   @lhotari I have tested with 
`gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728`, and was able to offload 
ledger of 3.5gb to gcs bucket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] PKIX path building failed: unable to find valid certification path to request to requested target [pulsar-helm-chart]

2024-04-22 Thread via GitHub


bharatbhushan1705 opened a new issue, #492:
URL: https://github.com/apache/pulsar-helm-chart/issues/492

   **Describe the bug**
   A clear and concise description of what the bug is.
   The error message javax.net.ssl.SSLHandshakeException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Use of self-signed certificate keystone
   2. The TLS is enabled for proxy and broker
   3. The Apache Pulsar is running in Openshift Cluster
   4. The Apache Pulsar version 3.2.0
   
   **Expected behavior**
   When we are communicating directly to broker url it is working, we expect 
same when we try to connect via proxy.
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
- OS: Ubuntu
   
   **Additional context**
   Add any other context about the problem here.
   Error logs: 
   2172
   at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?]
   2173
   at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
 ~[?:?]
   2174
   at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
 ~[?:?]
   2175
   at 
sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329)
 ~[?:?]
   2176
   ... 17 more
   2177
   Caused by: sun.security.provider.certpath.SunCertPathBuilderException: 
unable to find valid certification path to requested target
   2178
   at 
sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
 ~[?:?]
   2179
   at 
sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129)
 ~[?:?]
   2180
   at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) ~[?:?]
   2181
   at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) 
~[?:?]
   2182
   at 
sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) 
~[?:?]
   2183
   at sun.security.validator.Validator.validate(Validator.java:264) ~[?:?]
   2184
   at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
 ~[?:?]
   2185
   at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
 ~[?:?]
   2186
   at 
sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329)
 ~[?:?]
   2187
   ... 17 more
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] PIP-307: Use assigned broker URL hints during broker reconnection [pulsar-client-go]

2024-04-22 Thread via GitHub


BewareMyPower commented on code in PR #1208:
URL: https://github.com/apache/pulsar-client-go/pull/1208#discussion_r1574823910


##
pulsar/internal/lookup_service.go:
##
@@ -30,8 +30,9 @@ import (
 
 // LookupResult encapsulates a struct for lookup a request, containing two 
parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
-   LogicalAddr  *url.URL
-   PhysicalAddr *url.URL
+   IsProxyThroughServiceURL bool

Review Comment:
   It seems `IsProxyThroughServiceURL` is never used?



##
pulsar/producer_partition.go:
##
@@ -373,10 +390,14 @@ func (p *partitionProducer) GetBuffer() internal.Buffer {
return b
 }
 
-func (p *partitionProducer) ConnectionClosed() {
+func (p *partitionProducer) ConnectionClosed(closeProducer 
*pb.CommandCloseProducer) {

Review Comment:
   I see `ConnectionClosed(nil)` could be called in `connection.go`, should you 
perform the null check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] broker healthcheck endpoint reports No such ledger exists on Metadata Server - ledger endlessly [pulsar]

2024-04-22 Thread via GitHub


GitHub user wallacepeng edited a discussion: broker healthcheck endpoint 
reports No such ledger exists on Metadata Server -  ledger endlessly


2024-04-21T03:14:20,474+ [pulsar-io-4-7] ERROR 
org.apache.pulsar.client.impl.ProducerImpl - 
[persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck]
 [null] Failed to create producer: 
{"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.SchemaException:
 No such ledger exists on Metadata Server -  ledger=3596046 - operation=Failed 
to open ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger","reqId":3493733038286036253, 
"remote":"pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650",
 "local":"/172.20.203.179:58124"}
2024-04-21 03:14:20.474 
2024-04-21T03:14:20,474+ [pulsar-io-4-7] WARN  
org.apache.pulsar.client.impl.ClientCnx - [id: 0xdce5010a, 
L:/172.20.203.179:58124 - 
R:pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local/172.20.203.179:6650] 
Received error from server: 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger caused by 
org.apache.pulsar.broker.service.schema.exceptions.SchemaException: No such 
ledger exists on Metadata Server -  ledger=3596046 - operation=Failed to open 
ledger


this is caused by that internal healthcheck cannot create producer for topic 
(it seems have null producerName)
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
 . 
it reports No such ledger exists on Metadata Server  .  All the others work 
except the healthcheck .

i have bookkeeper clusters, i decommission the default one then it got this 
error . does it still try to create ledger on decommisisoned bookie ?

```

bin/pulsar-admin topics stats-internal 
persistent://pulsar/pulsar/pulsar-broker-0.pulsar-broker.pulsar.svc.cluster.local:8080/healthcheck
No such ledger exists on Metadata Server

Reason: No such ledger exists on Metadata Server



https://github.com/apache/pulsar/assets/894641/9eb33a0f-48f4-4ff4-8f80-88ea5c7cc1d7;>

correct log from other pulsar cluster looks like 
https://github.com/apache/pulsar/assets/894641/90c104d9-b351-4212-b26b-41dc170a18e4;>


GitHub link: https://github.com/apache/pulsar/discussions/22545


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] 3.0 test [pulsar]

2024-04-22 Thread via GitHub


coderzc closed pull request #22555: 3.0 test
URL: https://github.com/apache/pulsar/pull/22555


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] 3.0 test [pulsar]

2024-04-22 Thread via GitHub


coderzc opened a new pull request, #22555:
URL: https://github.com/apache/pulsar/pull/22555

   
   
   
   
   Fixes #xyz
   
   
   
   Main Issue: #xyz
   
   
   
   PIP: #xyz 
   
   
   
   ### Motivation
   
   
   
   ### Modifications
   
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] 3.0 test [pulsar]

2024-04-22 Thread via GitHub


github-actions[bot] commented on PR #22555:
URL: https://github.com/apache/pulsar/pull/22555#issuecomment-2069570265

   @coderzc Please add the following content to your PR description and select 
a checkbox:
   ```
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]

2024-04-22 Thread via GitHub


coderzc commented on code in PR #22552:
URL: https://github.com/apache/pulsar/pull/22552#discussion_r1574766383


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##
@@ -4695,5 +4695,58 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
 }
 
+@Test
+public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+String mlName = "my_test_ledger";
+String cursorName = "c1";
+
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+// Write some data.
+Position p0 = ledger.addEntry("entry-0".getBytes());
+Position p1 = ledger.addEntry("entry-1".getBytes());
+
+// Read and ack message.
+List entries = c1.readEntries(2);
+assertEquals(entries.size(), 2);
+assertEquals(entries.get(0).getPosition(), p0);
+assertEquals(entries.get(1).getPosition(), p1);
+entries.forEach(Entry::release);
+
+// Mark delete the last message.
+c1.markDelete(p1);
+Position markDeletedPosition = c1.getMarkDeletedPosition();
+Assert.assertEquals(markDeletedPosition, p1);
+
+// Terminate the managed ledger.
+Position lastPosition = ledger.terminate();
+assertEquals(lastPosition, p1);
+
+// Close the ledger.
+ledger.close();
+
+// Reopen the ledger.
+ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+BookKeeper mockBookKeeper = mock(BookKeeper.class);
+final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+cursorName);
+
+// Recover the cursor.
+cursor.recover(new VoidCallback() {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-go) branch master updated: [Improve] Add admin api GetLeaderBroker (#1203)

2024-04-22 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new b4d45cd3 [Improve] Add admin api GetLeaderBroker (#1203)
b4d45cd3 is described below

commit b4d45cd360599b52e3f160db9d39d619889c556a
Author: crossoverJie 
AuthorDate: Mon Apr 22 21:25:00 2024 +0800

[Improve] Add admin api GetLeaderBroker (#1203)

### Motivation

To keep consistent with the [Java 
client](https://github.com/apache/pulsar/pull/9799).

### Modifications

Add `GetLeaderBroker` interface.
---
 pulsaradmin/pkg/admin/brokers.go  | 12 
 pulsaradmin/pkg/admin/brokers_test.go | 16 
 pulsaradmin/pkg/utils/data.go |  5 +
 3 files changed, 33 insertions(+)

diff --git a/pulsaradmin/pkg/admin/brokers.go b/pulsaradmin/pkg/admin/brokers.go
index e178610c..650fab8e 100644
--- a/pulsaradmin/pkg/admin/brokers.go
+++ b/pulsaradmin/pkg/admin/brokers.go
@@ -58,6 +58,9 @@ type Brokers interface {
 
// HealthCheckWithTopicVersion run a health check on the broker
HealthCheckWithTopicVersion(utils.TopicVersion) error
+
+   // GetLeaderBroker get the information of the leader broker.
+   GetLeaderBroker() (utils.BrokerInfo, error)
 }
 
 type broker struct {
@@ -162,3 +165,12 @@ func (b *broker) HealthCheckWithTopicVersion(topicVersion 
utils.TopicVersion) er
}
return nil
 }
+func (b *broker) GetLeaderBroker() (utils.BrokerInfo, error) {
+   endpoint := b.pulsar.endpoint(b.basePath, "/leaderBroker")
+   var brokerInfo utils.BrokerInfo
+   err := b.pulsar.Client.Get(endpoint, )
+   if err != nil {
+   return brokerInfo, err
+   }
+   return brokerInfo, nil
+}
diff --git a/pulsaradmin/pkg/admin/brokers_test.go 
b/pulsaradmin/pkg/admin/brokers_test.go
index d48ce7cb..97679759 100644
--- a/pulsaradmin/pkg/admin/brokers_test.go
+++ b/pulsaradmin/pkg/admin/brokers_test.go
@@ -42,3 +42,19 @@ func TestBrokerHealthCheckWithTopicVersion(t *testing.T) {
err = admin.Brokers().HealthCheckWithTopicVersion(utils.TopicVersionV2)
assert.NoError(t, err)
 }
+
+func TestGetLeaderBroker(t *testing.T) {
+   readFile, err := 
os.ReadFile("../../../integration-tests/tokens/admin-token")
+   assert.NoError(t, err)
+   cfg := {
+   Token: string(readFile),
+   }
+   admin, err := New(cfg)
+   assert.NoError(t, err)
+   assert.NotNil(t, admin)
+   leaderBroker, err := admin.Brokers().GetLeaderBroker()
+   assert.NoError(t, err)
+   assert.NotNil(t, leaderBroker)
+   assert.NotEmpty(t, leaderBroker.ServiceURL)
+   assert.NotEmpty(t, leaderBroker.BrokerID)
+}
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index 1e67e3c7..61607912 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -477,6 +477,11 @@ type GetStatsOptions struct {
ExcludeConsumers bool `json:"exclude_consumers"`
 }
 
+type BrokerInfo struct {
+   BrokerID   string `json:"brokerId"`
+   ServiceURL string `json:"serviceUrl"`
+}
+
 type TopicVersion string
 
 const (



Re: [PR] [Improve] Add admin api GetLeaderBroker [pulsar-client-go]

2024-04-22 Thread via GitHub


BewareMyPower merged PR #1203:
URL: https://github.com/apache/pulsar-client-go/pull/1203


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-client-go) branch master updated: [improve] add a lint-docker command in makefile (#1207)

2024-04-22 Thread xyz
This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
 new 13ecca24 [improve] add a lint-docker command in makefile (#1207)
13ecca24 is described below

commit 13ecca24ffb6b65ac7ec36da53ecb67479518647
Author: zhou zhuohan <843520...@qq.com>
AuthorDate: Mon Apr 22 21:23:49 2024 +0800

[improve] add a lint-docker command in makefile (#1207)

Co-authored-by: ninjazhou 
---
 Makefile | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/Makefile b/Makefile
index df4d539d..cdae8a59 100644
--- a/Makefile
+++ b/Makefile
@@ -38,6 +38,12 @@ lint: bin/golangci-lint
 bin/golangci-lint:
GOBIN=$(shell pwd)/bin go install 
github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2
 
+# an alternative to above `make lint` command
+# use golangCi-lint docker to avoid local golang env issues
+# https://golangci-lint.run/welcome/install/
+lint-docker:
+   docker run --rm -v $(shell pwd):/app -w /app 
golangci/golangci-lint:v1.51.2 golangci-lint run -v
+
 container:
docker build -t ${IMAGE_NAME} \
  --build-arg GO_VERSION="${GO_VERSION}" \



Re: [PR] [improve] add a lint-docker command in makefile [pulsar-client-go]

2024-04-22 Thread via GitHub


BewareMyPower merged PR #1207:
URL: https://github.com/apache/pulsar-client-go/pull/1207


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on PR #0:
URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069414827

   @nikhil-ctds I created #22554 that changes the default value of 
`gcsManagedLedgerOffloadMaxBlockSizeInBytes`. It would be helpful if you could 
test that `gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728` fixes the 
issue. thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[I] Offloading large ledgers (>2GB) fail with Google Cloud Storage [pulsar]

2024-04-22 Thread via GitHub


lhotari opened a new issue, #15159:
URL: https://github.com/apache/pulsar/issues/15159

   **Describe the bug**
   
   Offloading large ledgers fail with Google Cloud Storage and the default 
settings. With default settings offloading ledgers over 2GB will fail to GCS 
since there's a limitation in JClouds (reported as [[JCLOUDS-1606] Cannot 
upload more than 32 parts to 
GCS](https://issues.apache.org/jira/browse/JCLOUDS-1606)) which limits a 
multipart upload to 32 parts. GCS supports multipart uploads up to 1 parts, 
but JClouds doesn't use the API in a way to achieve more than 32 parts.
   
   Here's an example log entry of the problem:
   ```
   java.util.concurrent.CompletionException: 
org.jclouds.http.HttpResponseException: command: POST 
https://www.googleapis.com/storage/v1/b/somebucket/o/ff553922-1fa3-4ceb-abcd-60106603b5c8-object-123456/compose
 HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{
 "error": {
   "code": 400,
   "message": "The number of source components provided (35) exceeds the 
maximum (32)",
   "errors": [
 {
   "message": "The number of source components provided (35) exceeds 
the maximum (32)",
   "domain": "global",
   "reason": "invalid"
 }
   ]
 }
   }
   ]
   at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) 
~[?:?]
   at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
 ~[?:?]
   at 
java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
 ~[?:?]
   at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
   at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
 ~[?:?]
   at 
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$offload$0(BlobStoreManagedLedgerOffloader.java:237)
 ~[?:?]
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
   at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
 [com.google.guava-guava-31.0.1-jre.jar:?]
   at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
 ~[?:?]
   at 
org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.lambda$offload$0(BlobStoreManagedLedgerOffloader.java:237)
 ~[?:?]
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
   at 
com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
 [com.google.guava-guava-31.0.1-jre.jar:?]
   at 
com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
 [com.google.guava-guava-31.0.1-jre.jar:?]
   at 
com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
 [com.google.guava-guava-31.0.1-jre.jar:?]
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
   at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
   at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [io.netty-netty-common-4.1.74.Final.jar:4.1.74.Final]
   at java.lang.Thread.run(Thread.java:829) [?:?]
   Caused by: org.jclouds.http.HttpResponseException: command: POST 
https://www.googleapis.com/storage/v1/b/somebucket/o/ff553922-1fa3-4ceb-abcd-60106603b5c8-object-123456/compose
 HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{
 "error": {
   "code": 400,
   "message": "The number of source components provided (35) exceeds the 
maximum (32)",
   "errors": [
 {
   "message": "The number of source components provided (35) exceeds 
the maximum (32)",
   "domain": "global",
   "reason": "invalid"
 }
   ]
 }
   }
   ]
   at 
org.jclouds.googlecloudstorage.handlers.GoogleCloudStorageErrorHandler.handleError(GoogleCloudStorageErrorHandler.java:40)
 ~[?:?]
   at 
org.jclouds.http.handlers.DelegatingErrorHandler.handleError(DelegatingErrorHandler.java:65)
 ~[?:?]
   at 
org.jclouds.http.internal.BaseHttpCommandExecutorService.shouldContinue(BaseHttpCommandExecutorService.java:138)
 ~[?:?]
   at 

Re: [I] Offloading large ledgers (>2GB) fail with Google Cloud Storage [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on issue #15159:
URL: https://github.com/apache/pulsar/issues/15159#issuecomment-2069410440

   #0 alone doesn't fix the issue. Created #22554 to fix the remaining 
issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]

2024-04-22 Thread via GitHub


shibd commented on code in PR #22552:
URL: https://github.com/apache/pulsar/pull/22552#discussion_r1574746976


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##
@@ -4695,5 +4695,58 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
 }
 
+@Test
+public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+String mlName = "my_test_ledger";
+String cursorName = "c1";
+
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+// Write some data.
+Position p0 = ledger.addEntry("entry-0".getBytes());
+Position p1 = ledger.addEntry("entry-1".getBytes());
+
+// Read and ack message.
+List entries = c1.readEntries(2);
+assertEquals(entries.size(), 2);
+assertEquals(entries.get(0).getPosition(), p0);
+assertEquals(entries.get(1).getPosition(), p1);
+entries.forEach(Entry::release);
+
+// Mark delete the last message.
+c1.markDelete(p1);
+Position markDeletedPosition = c1.getMarkDeletedPosition();
+Assert.assertEquals(markDeletedPosition, p1);
+
+// Terminate the managed ledger.
+Position lastPosition = ledger.terminate();
+assertEquals(lastPosition, p1);
+
+// Close the ledger.
+ledger.close();
+
+// Reopen the ledger.
+ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+BookKeeper mockBookKeeper = mock(BookKeeper.class);
+final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+cursorName);
+
+// Recover the cursor.
+cursor.recover(new VoidCallback() {

Review Comment:
   Need to convert to a sync call and to assert recover success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [improve][broker] Make BrokerSelectionStrategy pluggable [pulsar]

2024-04-22 Thread via GitHub


BewareMyPower opened a new pull request, #22553:
URL: https://github.com/apache/pulsar/pull/22553

   ### Motivation
   
   When users want to extend the `ExtensibleLoadManagerImpl`, the 
`BrokerSelectionStrategy` cannot be customized, so users have to write much 
duplicated code for a customized broker selection strategy.
   
   ### Modifications
   
   Add a stable interface `BrokerSelectionStrategyFactory` and implements this 
interface in `ExtensibleLoadManagerImpl` with the default implementation that 
returns a `LeastResourceUsageWithWeight` instance by default. Add 
`CustomBrokerSelectionStrategyTest` to show how to customize the broker 
selection strategy and verify it works.
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [x] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]

2024-04-22 Thread via GitHub


shibd commented on code in PR #22552:
URL: https://github.com/apache/pulsar/pull/22552#discussion_r1574746976


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##
@@ -4695,5 +4695,58 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 && cursorReadPosition.getEntryId() == 
expectReadPosition.getEntryId());
 }
 
+@Test
+public void testRecoverCursorWithTerminateManagedLedger() throws Exception 
{
+String mlName = "my_test_ledger";
+String cursorName = "c1";
+
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, 
config);
+ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(cursorName);
+
+// Write some data.
+Position p0 = ledger.addEntry("entry-0".getBytes());
+Position p1 = ledger.addEntry("entry-1".getBytes());
+
+// Read and ack message.
+List entries = c1.readEntries(2);
+assertEquals(entries.size(), 2);
+assertEquals(entries.get(0).getPosition(), p0);
+assertEquals(entries.get(1).getPosition(), p1);
+entries.forEach(Entry::release);
+
+// Mark delete the last message.
+c1.markDelete(p1);
+Position markDeletedPosition = c1.getMarkDeletedPosition();
+Assert.assertEquals(markDeletedPosition, p1);
+
+// Terminate the managed ledger.
+Position lastPosition = ledger.terminate();
+assertEquals(lastPosition, p1);
+
+// Close the ledger.
+ledger.close();
+
+// Reopen the ledger.
+ledger = (ManagedLedgerImpl) factory.open(mlName, config);
+BookKeeper mockBookKeeper = mock(BookKeeper.class);
+final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, 
new ManagedLedgerConfig(), ledger,
+cursorName);
+
+// Recover the cursor.
+cursor.recover(new VoidCallback() {

Review Comment:
   Need to convert to a sync call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-22 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, 

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-22 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, 

[PR] [fix][ml] Fix NPE of getValidPositionAfterSkippedEntries when recovering a terminated managed ledger [pulsar]

2024-04-22 Thread via GitHub


coderzc opened a new pull request, #22552:
URL: https://github.com/apache/pulsar/pull/22552

   ### Motivation
   
   Since managedLedger does not create a new ledger when recovering a 
terminated managed ledger, lead to getValidPositionAfterSkippedEntries will get 
an NPE.
   
   ```
   org.apache.bookkeeper.mledger.ManagedLedgerException$MetaStoreException: 
java.util.concurrent.CompletionException: java.lang.NullPointerException: 
Cannot invoke "org.apache.bookkeeper.client.LedgerHandle.getId()" because 
"this.currentLedger" is null
   Caused by: java.util.concurrent.CompletionException: 
java.lang.NullPointerException: Cannot invoke 
"org.apache.bookkeeper.client.LedgerHandle.getId()" because 
"this.currentLedger" is null
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
 ~[?:?]
at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
 ~[?:?]
at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:722)
 ~[?:?]
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
 ~[?:?]
at 
org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:201)
 [bookkeeper-common-4.16.4.jar:4.16.4]
at 
org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46)
 [bookkeeper-common-4.16.4.jar:4.16.4]
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 [?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
[?:?]
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 [?:?]
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 [?:?]
at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.108.Final.jar:4.1.108.Final]
at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
   Caused by: java.lang.NullPointerException: Cannot invoke 
"org.apache.bookkeeper.client.LedgerHandle.getId()" because 
"this.currentLedger" is null
   ```
   
   
https://github.com/apache/pulsar/blob/3a0f908e80d0863920a1258362fd782e95fe8f17/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L486-L492
   
   
https://github.com/apache/pulsar/blob/3a0f908e80d0863920a1258362fd782e95fe8f17/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L3698-L3712
   
   ### Modifications
   
   If `currentLedger == null`, then return `lastConfirmedEntry.getNext()` for 
getValidPositionAfterSkippedEntries to avoid NPE.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] upgrade jclouds 2.5.0 -> 2.6.0 [pulsar]

2024-04-22 Thread via GitHub


lhotari commented on PR #0:
URL: https://github.com/apache/pulsar/pull/0#issuecomment-2069324241

   > @lhotari With the above fix I tried testing GCS offloader with ledgers of 
size >= 2.5gb. I'm still facing the same error
   > 
   > ```
   > Error in offload
   > null
   > 
   > Reason: Error offloading: 
org.apache.bookkeeper.mledger.ManagedLedgerException: 
java.util.concurrent.CompletionException: 
org.jclouds.http.HttpResponseException: command: POST 
https://www.googleapis.com/storage/v1/b/bucket-cognitree-ls1258/o/d7f69abd-dc45-4fc1-bbe3-bd9daaa8fea7-ledger-9/compose
 HTTP/1.1 failed with response: HTTP/1.1 400 Bad Request; content: [{
   >   "error": {
   > "code": 400,
   > "message": "The number of source components provided (38) exceeds the 
maximum (32)",
   > "errors": [
   >   {
   > "message": "The number of source components provided (38) exceeds 
the maximum (32)",
   > "domain": "global",
   > "reason": "invalid"
   >   }
   > ]
   >   }
   > }
   > ]
   > ```
   > 
   > I have also written a simple code using Jclouds 2.6.0 to upload a large 
file of size 3gb to GCS bucket, it has uploaded successfully.
   
   @nikhil-ctds Do you have a chance to rerun the test with 
`gcsManagedLedgerOffloadMaxBlockSizeInBytes=134217728` ?
   The default value `gcsManagedLedgerOffloadMaxBlockSizeInBytes=67108864` will 
limit the max upload size to 2048MiB (64MiB * 32). Doubling 
gcsManagedLedgerOffloadMaxBlockSizeInBytes to `134217728` should lift the limit 
to 4096MiB.
   Can you please verify? Thanks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][admin] Fix can't delete tenant for v1 [pulsar]

2024-04-22 Thread via GitHub


Technoboy- opened a new pull request, #22550:
URL: https://github.com/apache/pulsar/pull/22550

   
   ### Motivation
   
   Can't delete tenant for v1.  Find this bug when fixing #22547. 
   ```
   Caused by: 
org.apache.pulsar.metadata.api.MetadataStoreException$ContentDeserializationException:
 Failed to deserialize payload for key '/admin/policies/p1/c1'
at 
org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl.lambda$readValueFromStore$0(MetadataCacheImpl.java:115)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
... 12 more
   Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No 
content to map due to end-of-input
at [Source: (byte[])""; line: 1, column: 0]
at 
com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
at 
com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1746)
at 
com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:360)
at 
com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2095)
at 
com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1583)
at 
org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType.deserialize(JSONMetadataSerdeSimpleType.java:46)
at 
org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl.lambda$readValueFromStore$0(MetadataCacheImpl.java:111)
... 13 more
   
   2024-04-22T20:24:34,580 - INFO  - 
[configuration-metadata-store-4-1:ResourceGroupNamespaceConfigListener] - 
Metadata store notification: Path /admin/policies/p1/c1, Type Deleted
   2024-04-22T20:24:34,594 - ERROR - [metadata-store-2-1:TenantsBase] - 
[pass.pass] Failed to delete tenant p1
   org.apache.pulsar.metadata.api.MetadataStoreException: 
org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = 
Directory not empty for /managed-ledgers/p1
at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:476)
 ~[classes/:?]
at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.handleDeleteResult(ZKMetadataStore.java:304)
 ~[classes/:?]
at 
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$5(ZKMetadataStore.java:216)
 ~[classes/:?]
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 [?:?]
at 
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) 
[?:?]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 [?:?]
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 [?:?]
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 [?:?]
at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.108.Final.jar:4.1.108.Final]
at java.base/java.lang.Thread.run(Thread.java:833) [?:?]
   Caused by: org.apache.zookeeper.KeeperException$NotEmptyException: 
KeeperErrorCode = Directory not empty for /managed-ledgers/p1
   ```
   
   
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][test] Add topic policy test for topic API (#22546)

2024-04-22 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3a0f908e80d [improve][test] Add topic policy test for topic API 
(#22546)
3a0f908e80d is described below

commit 3a0f908e80d0863920a1258362fd782e95fe8f17
Author: Jiwei Guo 
AuthorDate: Mon Apr 22 19:47:03 2024 +0800

[improve][test] Add topic policy test for topic API (#22546)
---
 .../org/apache/pulsar/broker/admin/AuthZTest.java  |  113 ++
 .../apache/pulsar/broker/admin/TopicAuthZTest.java | 1121 ++--
 .../admin/TransactionAndSchemaAuthZTest.java   |  359 +++
 3 files changed, 1270 insertions(+), 323 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java
new file mode 100644
index 000..a710a03970d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AuthZTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import io.jsonwebtoken.Jwts;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static org.mockito.Mockito.doReturn;
+
+public class AuthZTest extends MockedPulsarStandalone {
+
+protected PulsarAdmin superUserAdmin;
+
+protected PulsarAdmin tenantManagerAdmin;
+
+protected AuthorizationService authorizationService;
+
+protected AuthorizationService orignalAuthorizationService;
+
+protected static final String TENANT_ADMIN_SUBJECT =  
UUID.randomUUID().toString();
+protected static final String TENANT_ADMIN_TOKEN = Jwts.builder()
+.claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact();
+
+@BeforeMethod(alwaysRun = true)
+public void before() throws IllegalAccessException {
+orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+authorizationService = Mockito.spy(orignalAuthorizationService);
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+authorizationService, true);
+}
+
+@AfterMethod(alwaysRun = true)
+public void after() throws IllegalAccessException {
+FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+orignalAuthorizationService, true);
+}
+
+protected AtomicBoolean setAuthorizationTopicOperationChecker(String role, 
Object operation) {
+AtomicBoolean execFlag = new AtomicBoolean(false);
+if (operation instanceof TopicOperation) {
+Mockito.doAnswer(invocationOnMock -> {
+String role_ = invocationOnMock.getArgument(2);
+if (role.equals(role_)) {
+TopicOperation operation_ = 
invocationOnMock.getArgument(1);
+Assert.assertEquals(operation_, operation);
+}
+execFlag.set(true);
+return invocationOnMock.callRealMethod();
+
}).when(authorizationService).allowTopicOperationAsync(Mockito.any(), 
Mockito.any(), Mockito.any(),
+Mockito.any(), Mockito.any());
+} else if (operation instanceof NamespaceOperation) {
+doReturn(true)
+
.when(authorizationService).isValidOriginalPrincipal(Mockito.any(), 
Mockito.any(), Mockito.any());
+Mockito.doAnswer(invocationOnMock -> {
+String role_ = invocationOnMock.getArgument(2);
+

Re: [PR] [improve][test] Add topic policy test for topic API [pulsar]

2024-04-22 Thread via GitHub


Technoboy- merged PR #22546:
URL: https://github.com/apache/pulsar/pull/22546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Support lookup options for extensible load manager [pulsar]

2024-04-22 Thread via GitHub


Demogorgon314 commented on code in PR #22487:
URL: https://github.com/apache/pulsar/pull/22487#discussion_r1574549415


##
tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java:
##
@@ -405,7 +405,7 @@ public void testIsolationPolicy() throws Exception {
 admin.lookups().lookupTopicAsync(topic).get(5, 
TimeUnit.SECONDS);

Review Comment:
   Yes, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >