Re: [I] Golang instance of functions missing many fields of the `FunctionDetails` [pulsar]

2024-03-28 Thread via GitHub


Technoboy- closed issue #22349: Golang instance of functions missing many 
fields of the `FunctionDetails`
URL: https://github.com/apache/pulsar/issues/22349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][fn] Pass FunctionDetails to Go instance (#22350)

2024-03-28 Thread technoboy
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7315aeb6258 [improve][fn] Pass FunctionDetails to Go instance (#22350)
7315aeb6258 is described below

commit 7315aeb6258b7adc9d874268d50acb95ffc0cf2b
Author: jiangpengcheng 
AuthorDate: Fri Mar 29 13:47:29 2024 +0800

[improve][fn] Pass FunctionDetails to Go instance (#22350)
---
 pulsar-function-go/conf/conf.go|   2 +
 pulsar-function-go/pf/instanceConf.go  |  11 ++
 pulsar-function-go/pf/instanceConf_test.go | 207 +
 .../functions/instance/go/GoInstanceConfig.java|   2 +
 .../pulsar/functions/runtime/RuntimeUtils.java |   6 +
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |   8 +
 6 files changed, 236 insertions(+)

diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go
index 03513648fac..1442a0f865f 100644
--- a/pulsar-function-go/conf/conf.go
+++ b/pulsar-function-go/conf/conf.go
@@ -91,6 +91,8 @@ type Conf struct {
UserConfig  string `json:"userConfig" yaml:"userConfig"`
//metrics config
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
+   // FunctionDetails
+   FunctionDetails string `json:"functionDetails" yaml:"functionDetails"`
 }
 
 var (
diff --git a/pulsar-function-go/pf/instanceConf.go 
b/pulsar-function-go/pf/instanceConf.go
index 4cb60dd258a..844a2bc9b89 100644
--- a/pulsar-function-go/pf/instanceConf.go
+++ b/pulsar-function-go/pf/instanceConf.go
@@ -25,7 +25,9 @@ import (
"time"
 
"github.com/apache/pulsar/pulsar-function-go/conf"
+   log "github.com/apache/pulsar/pulsar-function-go/logutil"
pb "github.com/apache/pulsar/pulsar-function-go/pb"
+   "google.golang.org/protobuf/encoding/protojson"
 )
 
 // This is the config passed to the Golang Instance. Contains all the 
information
@@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf 
{
tlsAllowInsecure:cfg.TLSAllowInsecureConnection,
tlsHostnameVerification: cfg.TLSHostnameVerificationEnable,
}
+   // parse the raw function details and ignore the unmarshal 
error(fallback to original way)
+   if cfg.FunctionDetails != "" {
+   functionDetails := pb.FunctionDetails{}
+   if err := protojson.Unmarshal([]byte(cfg.FunctionDetails), 
); err != nil {
+   log.Errorf("Failed to unmarshal function details: %v", 
err)
+   } else {
+   instanceConf.funcDetails = functionDetails
+   }
+   }
 
if instanceConf.funcDetails.ProcessingGuarantees == 
pb.ProcessingGuarantees_EFFECTIVELY_ONCE {
panic("Go instance current not support EFFECTIVELY_ONCE 
processing guarantees.")
diff --git a/pulsar-function-go/pf/instanceConf_test.go 
b/pulsar-function-go/pf/instanceConf_test.go
index 02aef913ebc..cc5f46e2fe1 100644
--- a/pulsar-function-go/pf/instanceConf_test.go
+++ b/pulsar-function-go/pf/instanceConf_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+   "fmt"
"testing"
 
cfg "github.com/apache/pulsar/pulsar-function-go/conf"
@@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) {
newInstanceConfWithConf({ProcessingGuarantees: 3})
}, "Should have a panic")
 }
+
+func TestInstanceConf_WithDetails(t *testing.T) {
+   cfg := {
+   FunctionDetails: 
`{"tenant":"public","namespace":"default","name":"test-function","className":"process",
+"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1,
+"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000",
+"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000},
+"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec":
+{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}}
+,"negativeAckRedeliveryDelayMs":"15000"},"sink":{"configs":"{\"password\":\"admin\"}","topic":"test-output",
+"typeClassName":"string","schemaType":"avro","producerSpec":{"maxPendingMessages":2000,"useThreadLocalProducers":true,
+"cryptoSpec":{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"DISCARD"},
+"batchBuilder":"DEFAULT"}},"resources":{"cpu":2.0,"ram":"1024","disk":"1024"},"packageUrl":"/path/to/package",
+"retryDetails":{"maxMessageRetries":3,"deadLetterTopic":"test-dead-letter-topic"},"secretsMap":
+"{\"secret1\":\"secret-value1\"}","runtimeFlags":"flags","componentType":"FUNCTION","customRuntimeOptions":"options",

Re: [PR] [improve][fn] Pass FunctionDetails to Go instance [pulsar]

2024-03-28 Thread via GitHub


Technoboy- merged PR #22350:
URL: https://github.com/apache/pulsar/pull/22350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]

2024-03-28 Thread via GitHub


BewareMyPower commented on PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#issuecomment-2026681713

   @codelipenghui PTAL again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][broker] Replication rate limiter counts batch messages [pulsar]

2024-03-28 Thread via GitHub


nodece opened a new pull request, #22380:
URL: https://github.com/apache/pulsar/pull/22380

   ### Motivation
   
   The batch messages include multiple messages, but we only count 1.
   
   ### Modifications
   
   If `msg.getMessageBuilder().hasNumMessagesInBatch()` returns true, use this 
value; otherwise, use 1.
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-2.11 updated: [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245)

2024-03-28 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new 03493ef03c0 [improve] [broker] Avoid repeated Read-and-discard when 
using Key_Shared mode (#22245)
03493ef03c0 is described below

commit 03493ef03c07fd3a30f2b867baac42776c94d614
Author: fengyubiao 
AuthorDate: Fri Mar 29 12:06:26 2024 +0800

[improve] [broker] Avoid repeated Read-and-discard when using Key_Shared 
mode (#22245)

(cherry picked from commit e34ea626a65da4c8e1578010f857aa961a7b5c55)
---
 .../persistent/MessageRedeliveryController.java|   8 +
 .../PersistentDispatcherMultipleConsumers.java |  47 +++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 107 -
 .../client/api/KeySharedSubscriptionTest.java  | 266 +
 .../pulsar/client/api/ProducerConsumerBase.java|  66 +
 5 files changed, 473 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 934628b05a9..5ed6afb3f07 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -95,6 +95,14 @@ public class MessageRedeliveryController {
 }
 }
 
+public Long getHash(long ledgerId, long entryId) {
+LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
+if (value == null) {
+return null;
+}
+return value.first;
+}
+
 public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) 
{
 if (!allowOutOfOrderDelivery) {
 List keysToRemove = new ArrayList<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index d69b1dbaf91..9ff7b426bdc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -299,24 +299,25 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 }
 
 NavigableSet messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
-
-if (!messagesToReplayNow.isEmpty()) {
+NavigableSet messagesToReplayFiltered = 
filterOutEntriesWillBeDiscarded(messagesToReplayNow);
+if (!messagesToReplayFiltered.isEmpty()) {
 if (log.isDebugEnabled()) {
-log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
-consumerList.size());
+log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name,
+messagesToReplayFiltered.size(), 
consumerList.size());
 }
 
 havePendingReplayRead = true;
 minReplayedPosition = messagesToReplayNow.first();
 Set deletedMessages = 
topic.isDelayedDeliveryEnabled()
-? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
+? asyncReplayEntriesInOrder(messagesToReplayFiltered)
+: asyncReplayEntries(messagesToReplayFiltered);
 // clear already acked positions from replay bucket
 
 deletedMessages.forEach(position -> 
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
 ((PositionImpl) position).getEntryId()));
 // if all the entries are acked-entries and cleared up from 
redeliveryMessages, try to read
 // next entries as readCompletedEntries-callback was never 
called
-if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
+if ((messagesToReplayFiltered.size() - deletedMessages.size()) 
== 0) {
 havePendingReplayRead = false;
 readMoreEntriesAsync();
 }
@@ -325,7 +326,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
 totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
 }
-} else if (!havePendingRead) {

(pulsar) branch branch-3.0 updated: [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245)

2024-03-28 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 5b37e8434d9 [improve] [broker] Avoid repeated Read-and-discard when 
using Key_Shared mode (#22245)
5b37e8434d9 is described below

commit 5b37e8434d93c135b7c36c76a8177e2b51db5556
Author: fengyubiao 
AuthorDate: Fri Mar 29 12:06:26 2024 +0800

[improve] [broker] Avoid repeated Read-and-discard when using Key_Shared 
mode (#22245)

(cherry picked from commit e34ea626a65da4c8e1578010f857aa961a7b5c55)
---
 .../persistent/MessageRedeliveryController.java|   8 +
 .../PersistentDispatcherMultipleConsumers.java |  47 +++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 104 +++-
 .../client/api/KeySharedSubscriptionTest.java  | 266 +
 .../pulsar/client/api/ProducerConsumerBase.java|  66 +
 5 files changed, 470 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 5bf3f5506fa..63803177242 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -95,6 +95,14 @@ public class MessageRedeliveryController {
 }
 }
 
+public Long getHash(long ledgerId, long entryId) {
+LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
+if (value == null) {
+return null;
+}
+return value.first;
+}
+
 public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) 
{
 if (!allowOutOfOrderDelivery) {
 List keysToRemove = new ArrayList<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c164abf905d..9d0dba798ad 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -309,24 +309,25 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 }
 
 NavigableSet messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
-
-if (!messagesToReplayNow.isEmpty()) {
+NavigableSet messagesToReplayFiltered = 
filterOutEntriesWillBeDiscarded(messagesToReplayNow);
+if (!messagesToReplayFiltered.isEmpty()) {
 if (log.isDebugEnabled()) {
-log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
-consumerList.size());
+log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name,
+messagesToReplayFiltered.size(), 
consumerList.size());
 }
 
 havePendingReplayRead = true;
 minReplayedPosition = messagesToReplayNow.first();
 Set deletedMessages = 
topic.isDelayedDeliveryEnabled()
-? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
+? asyncReplayEntriesInOrder(messagesToReplayFiltered)
+: asyncReplayEntries(messagesToReplayFiltered);
 // clear already acked positions from replay bucket
 
 deletedMessages.forEach(position -> 
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
 ((PositionImpl) position).getEntryId()));
 // if all the entries are acked-entries and cleared up from 
redeliveryMessages, try to read
 // next entries as readCompletedEntries-callback was never 
called
-if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
+if ((messagesToReplayFiltered.size() - deletedMessages.size()) 
== 0) {
 havePendingReplayRead = false;
 readMoreEntriesAsync();
 }
@@ -335,7 +336,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
 totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
 }
-} else if (!havePendingRead) {
+   

Re: [PR] [fix][sec] implicit narrowing conversion in compound assignment [pulsar]

2024-03-28 Thread via GitHub


liangyepianzhou merged PR #22074:
URL: https://github.com/apache/pulsar/pull/22074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][sec] implicit narrowing conversion in compound assignment (#22074)

2024-03-28 Thread xiangying
This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 0701d7eedce [fix][sec] implicit narrowing conversion in compound 
assignment (#22074)
0701d7eedce is described below

commit 0701d7eedcef6aae750b5067139caf8e73434818
Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com>
AuthorDate: Fri Mar 29 12:43:11 2024 +0800

[fix][sec] implicit narrowing conversion in compound assignment (#22074)
---
 .../org/apache/pulsar/common/policies/data/SubscriptionStats.java | 2 +-
 .../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java  | 2 +-
 .../org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 9ff94a2952e..d4850adaa6f 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -46,7 +46,7 @@ public interface SubscriptionStats {
 double getMessageAckRate();
 
 /** Chunked message dispatch rate. */
-int getChunkedMessageRate();
+double getChunkedMessageRate();
 
 /** Number of entries in the subscription backlog. */
 long getMsgBacklog();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index bed8aabf27d..a8ea0060629 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -54,7 +54,7 @@ public class SubscriptionStatsImpl implements 
SubscriptionStats {
 public double messageAckRate;
 
 /** Chunked message dispatch rate. */
-public int chunkedMessageRate;
+public double chunkedMessageRate;
 
 /** Number of entries in the subscription backlog. */
 public long msgBacklog;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 60ade64e688..8c273236945 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -66,7 +66,7 @@ public class LocalBrokerData implements LoadManagerReport {
 // The stats given in the most recent invocation of update.
 private Map lastStats;
 
-private int numTopics;
+private long numTopics;
 private int numBundles;
 private int numConsumers;
 private int numProducers;
@@ -202,7 +202,7 @@ public class LocalBrokerData implements LoadManagerReport {
 msgRateOut = 0;
 msgThroughputIn = 0;
 msgThroughputOut = 0;
-int totalNumTopics = 0;
+long totalNumTopics = 0;
 int totalNumBundles = 0;
 int totalNumConsumers = 0;
 int totalNumProducers = 0;



(pulsar) branch master updated: [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245)

2024-03-28 Thread penghui
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e34ea626a65 [improve] [broker] Avoid repeated Read-and-discard when 
using Key_Shared mode (#22245)
e34ea626a65 is described below

commit e34ea626a65da4c8e1578010f857aa961a7b5c55
Author: fengyubiao 
AuthorDate: Fri Mar 29 12:06:26 2024 +0800

[improve] [broker] Avoid repeated Read-and-discard when using Key_Shared 
mode (#22245)
---
 .../persistent/MessageRedeliveryController.java|   8 +
 .../PersistentDispatcherMultipleConsumers.java |  47 +++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 104 +++-
 .../client/api/KeySharedSubscriptionTest.java  | 266 +
 .../pulsar/client/api/ProducerConsumerBase.java|  66 +
 .../api/SubscriptionPauseOnAckStatPersistTest.java |  78 +-
 6 files changed, 477 insertions(+), 92 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 5bf3f5506fa..63803177242 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -95,6 +95,14 @@ public class MessageRedeliveryController {
 }
 }
 
+public Long getHash(long ledgerId, long entryId) {
+LongPair value = hashesToBeBlocked.get(ledgerId, entryId);
+if (value == null) {
+return null;
+}
+return value.first;
+}
+
 public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) 
{
 if (!allowOutOfOrderDelivery) {
 List keysToRemove = new ArrayList<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 039104fe022..b441400dae1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -334,24 +334,25 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
 }
 
 NavigableSet messagesToReplayNow = 
getMessagesToReplayNow(messagesToRead);
-
-if (!messagesToReplayNow.isEmpty()) {
+NavigableSet messagesToReplayFiltered = 
filterOutEntriesWillBeDiscarded(messagesToReplayNow);
+if (!messagesToReplayFiltered.isEmpty()) {
 if (log.isDebugEnabled()) {
-log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name, messagesToReplayNow.size(),
-consumerList.size());
+log.debug("[{}] Schedule replay of {} messages for {} 
consumers", name,
+messagesToReplayFiltered.size(), 
consumerList.size());
 }
 
 havePendingReplayRead = true;
 minReplayedPosition = messagesToReplayNow.first();
 Set deletedMessages = 
topic.isDelayedDeliveryEnabled()
-? asyncReplayEntriesInOrder(messagesToReplayNow) : 
asyncReplayEntries(messagesToReplayNow);
+? asyncReplayEntriesInOrder(messagesToReplayFiltered)
+: asyncReplayEntries(messagesToReplayFiltered);
 // clear already acked positions from replay bucket
 
 deletedMessages.forEach(position -> 
redeliveryMessages.remove(((PositionImpl) position).getLedgerId(),
 ((PositionImpl) position).getEntryId()));
 // if all the entries are acked-entries and cleared up from 
redeliveryMessages, try to read
 // next entries as readCompletedEntries-callback was never 
called
-if ((messagesToReplayNow.size() - deletedMessages.size()) == 
0) {
+if ((messagesToReplayFiltered.size() - deletedMessages.size()) 
== 0) {
 havePendingReplayRead = false;
 readMoreEntriesAsync();
 }
@@ -360,7 +361,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 log.debug("[{}] Dispatcher read is blocked due to 
unackMessages {} reached to max {}", name,
 totalUnackedMessages, 
topic.getMaxUnackedMessagesOnSubscription());
 }
-} else if (!havePendingRead) {
+} else if 

Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


codelipenghui merged PR #22245:
URL: https://github.com/apache/pulsar/pull/22245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


heesung-sn commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543996536


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   You also need to update configs like in this PR. Otherwise, the monitor job 
will clean the stuck unloading state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


dragosvictor commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543992617


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   Can confirm @Demogorgon314's finding. The test doesn't always execute the 
respective codepath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


dragosvictor commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543991161


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   You're right, thanks for clarifying!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


heesung-sn commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543978471


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   Interesting. I see the test fails if I remove this on my laptop.
   https://github.com/apache/pulsar/assets/103456639/ea53d7a6-4261-4654-93cb-0eef0b59e2fc;>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


Demogorgon314 commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543971934


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   Do we have a test to cover this change? I noticed that even after removing 
this change, the `testUnloadUponTopicLookupFailure` test still passes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][ml] No rollover inactive ledgers when metadata service invalid [pulsar]

2024-03-28 Thread via GitHub


Technoboy- closed pull request #22284: [fix][ml] No rollover inactive ledgers 
when metadata service invalid
URL: https://github.com/apache/pulsar/pull/22284


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


poorbarcode commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543965530


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -472,18 +475,18 @@ protected NavigableSet 
filterOutEntriesWillBeDiscarded(NavigableSe
 // Maybe using HashRangeExclusiveStickyKeyConsumerSelector.
 continue;
 }
-if (getAvailablePermits(c) == 0) {
+groupedPositions.computeIfAbsent(c, k -> new 
ArrayList<>()).add(pos);
+}
+// Filter positions by the Recently Joined Position rule.
+for (Map.Entry> item : 
groupedPositions.entrySet()) {
+int availablePermits = getAvailablePermits(item.getKey());
+if (availablePermits == 0) {
 continue;
 }
-PositionImpl currentMaxReadPosition = 
recentlyJoinedConsumers.get(c);
-if (currentMaxReadPosition == null) {
-res.add(pos);
-} else if (pos.compareTo(firstMaxReadPos) < 0 && 
pos.compareTo(currentMaxReadPosition) < 0) {
-res.add(pos);
-} else {
-// The subsequent positions will also be larger than 
"firstMaxReadPos", why need to check continuous.
-// Because the subsequent positions may be delivered to a 
consumer which does not in the
-// "recentlyJoinedConsumers".
+int posCountToRead = 
getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), 
availablePermits,
+ReadType.Replay, null);
+for (int i = 0; i < posCountToRead; i++) {
+res.add(item.getValue().get(i));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376)

2024-03-28 Thread baodi
This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a3bf4e8a42c [improve][io]: Add validation for JDBC sink not supporting 
primitive schema (#22376)
a3bf4e8a42c is described below

commit a3bf4e8a42c84a0ee5b4c45b50d48daed0b3de0c
Author: Baodi Shi 
AuthorDate: Fri Mar 29 08:33:27 2024 +0800

[improve][io]: Add validation for JDBC sink not supporting primitive schema 
(#22376)
---
 pulsar-io/jdbc/core/pom.xml|  7 +
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java |  5 
 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 31 +++---
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml
index 586307e8b86..0232a699680 100644
--- a/pulsar-io/jdbc/core/pom.xml
+++ b/pulsar-io/jdbc/core/pom.xml
@@ -71,6 +71,13 @@
   provided
 
 
+
+  ${project.groupId}
+  pulsar-client-original
+  ${project.version}
+  test
+
+
   
 
 
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
index 36c36740919..3655688c0f3 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
@@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends 
JdbcAbstractSink data.get(k);
 } else {
+SchemaType schemaType = 
message.getSchema().getSchemaInfo().getType();
+if (schemaType.isPrimitive()) {
+throw new UnsupportedOperationException("Primitive schema is 
not supported: " + schemaType);
+}
 recordValueGetter = (key) -> ((GenericRecord) 
record).getField(key);
 }
 String action = message.getProperties().get(ACTION_PROPERTY);
diff --git 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
index b15eb832242..c088dd3c42c 100644
--- 
a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
+++ 
b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java
@@ -22,6 +22,10 @@ import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.functions.api.Record;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest {
 return consumer.apply(record).endRecord().getFields().get(0).schema();
 }
 
+@Test(expectedExceptions = UnsupportedOperationException.class,
+expectedExceptionsMessageRegExp = "Primitive schema is not 
supported.*")
+@SuppressWarnings("unchecked")
+public void testNotSupportPrimitiveSchema() {
+BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new 
BaseJdbcAutoSchemaSink() {};
+AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
+
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
+Record record = new Record() {
+@Override
+public org.apache.pulsar.client.api.Schema 
getSchema() {
+return autoConsumeSchema;
+}
+
+@Override
+public GenericRecord getValue() {
+return null;
+}
+};
+baseJdbcAutoSchemaSink.createMutation((Record) record);
+}
+
 
 }
\ No newline at end of file
diff --git 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
 
b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index d9ed4cbd442..ca01615bef1 100644
--- 
a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 

Re: [PR] [improve][io]: Add validation for JDBC sink not supporting primitive schema [pulsar]

2024-03-28 Thread via GitHub


shibd merged PR #22376:
URL: https://github.com/apache/pulsar/pull/22376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Apache Beam support for Pulsar [pulsar]

2024-03-28 Thread via GitHub


GitHub user vcastell-tibco edited a comment on the discussion: Apache Beam 
support for Pulsar

I see that a Pulsar connector was made available as of BEAM 2.38.0 release but 
I don't see Pulsar as an official connector on below page. Is the Pulsar IO 
connector official or not? If official then can someone please update the page 
since it gives idea that a Pulsar IO connector is not available.

https://beam.apache.org/documentation/io/connectors/

GitHub link: 
https://github.com/apache/pulsar/discussions/18453#discussioncomment-8902102


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


codelipenghui commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543874681


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -472,18 +475,18 @@ protected NavigableSet 
filterOutEntriesWillBeDiscarded(NavigableSe
 // Maybe using HashRangeExclusiveStickyKeyConsumerSelector.
 continue;
 }
-if (getAvailablePermits(c) == 0) {
+groupedPositions.computeIfAbsent(c, k -> new 
ArrayList<>()).add(pos);
+}
+// Filter positions by the Recently Joined Position rule.
+for (Map.Entry> item : 
groupedPositions.entrySet()) {
+int availablePermits = getAvailablePermits(item.getKey());
+if (availablePermits == 0) {
 continue;
 }
-PositionImpl currentMaxReadPosition = 
recentlyJoinedConsumers.get(c);
-if (currentMaxReadPosition == null) {
-res.add(pos);
-} else if (pos.compareTo(firstMaxReadPos) < 0 && 
pos.compareTo(currentMaxReadPosition) < 0) {
-res.add(pos);
-} else {
-// The subsequent positions will also be larger than 
"firstMaxReadPos", why need to check continuous.
-// Because the subsequent positions may be delivered to a 
consumer which does not in the
-// "recentlyJoinedConsumers".
+int posCountToRead = 
getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), 
availablePermits,
+ReadType.Replay, null);
+for (int i = 0; i < posCountToRead; i++) {
+res.add(item.getValue().get(i));

Review Comment:
   Is it better to change to 
   
   ```
   res.addAll(item.getValue().subList(0, posCountToRead));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch main updated: Adapt the release process for PR 20981 changes

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new cfc1f8ab1e0b Adapt the release process for PR 20981 changes
cfc1f8ab1e0b is described below

commit cfc1f8ab1e0bd3d44767a9c059bb2613c5c53321
Author: Lari Hotari 
AuthorDate: Thu Mar 28 22:26:10 2024 +0200

Adapt the release process for PR 20981 changes

- see https://github.com/apache/pulsar/pull/20981 for the change
---
 contribute/release-process.md | 79 +++
 1 file changed, 57 insertions(+), 22 deletions(-)

diff --git a/contribute/release-process.md b/contribute/release-process.md
index 8413025ec1bd..5fdff1d26a08 100644
--- a/contribute/release-process.md
+++ b/contribute/release-process.md
@@ -235,7 +235,7 @@ The _apache-pulsar-shell_ artifacts are distributed 
beginning with release 2.11.
 
 :::
 
-### Inspect the artifacts
+### Check licenses
 
 First, check that the `LICENSE` and `NOTICE` files cover all included jars for 
the bin package. You can use script to cross-validate `LICENSE` file with 
included jars:
 
@@ -245,20 +245,7 @@ src/check-binary-license.sh 
distribution/server/target/apache-pulsar-*-bin.tar.g
 ```
 In some older branches, the script is called `src/check-binary-license` 
instead of `src/check-binary-license.sh`.
 
-
- Then, run Apache RAT to verify the license headers in the src package:
-```shell
-cd /tmp
-tar -xvzf $PULSAR_PATH/target/apache-pulsar-*-src.tar.gz
-cd apache-pulsar-$VERSION_WITHOUT_RC-src
-mvn apache-rat:check
-```
-
-Finally, use instructions in [verifying release 
candidates](validate-release-candidate.md) page to do some sanity checks on the 
produced binary distributions.
-
-### Sign and stage the artifacts on SVN
-
-The src and bin artifacts need to be signed and uploaded to the dist SVN 
repository for staging. This step should not run inside the $PULSAR_PATH.
+### Create and publish the GPG key if you haven't already done this
 
 If you haven't already done it, [create and publish the GPG 
key](create-gpg-keys.md). You will use the key to sign the release artifacts.
 
@@ -274,28 +261,59 @@ default-key 
 
 This can be automated with this command:
 ```shell
+# KEY_ID is in short format, subset key id visible in gpg -K
 KEY_ID=$(gpg --list-keys --with-colons $apache_u...@apache.org | egrep "^pub" 
| awk -F: '{print $5}')
 echo "default-key $KEY_ID" >> ~/.gnupg/gpg.conf
 ```
 
+### Sign and stage the artifacts to local SVN directory
+
+The src and bin artifacts need to be signed and finally uploaded to the dist 
SVN repository for staging. This step should not run inside the $PULSAR_PATH.
+
 ```shell
 # make sure to run svn mkdir commmand in a different dir(NOT IN $PULSAR_PATH).
 mkdir ~/pulsar-svn-release-$VERSION_RC
 cd ~/pulsar-svn-release-$VERSION_RC
 
+# create an empty directory in the SVN server
 svn mkdir --username $APACHE_USER -m "Add directory for pulsar $VERSION_RC 
release" https://dist.apache.org/repos/dist/dev/pulsar/pulsar-$VERSION_RC
+# checkout the empty directory
 svn co https://dist.apache.org/repos/dist/dev/pulsar/pulsar-$VERSION_RC
+# cd into the directory
 cd pulsar-$VERSION_RC
 
+# stage the release artifacts
 $PULSAR_PATH/src/stage-release.sh .
 
+# Please check the size of the files in the `pulsar-2.X.0-candidate-1`.
+# If you build the artifacts without a clean workspace, the 
`apache-pulsar-2.X.0-src.tar.gz` files
+# may be too large to be unable to upload.
+ls -ltra
+du -ms *
+
 # Verify the artifacts are correctly signed have correct checksums:
 ( for i in **/*.(tar.gz|zip|nar); do echo $i; gpg --verify $i.asc $i || exit 1 
; done )
 ( for i in **/*.(tar.gz|zip|nar); do echo $i; shasum -a 512 -c $i.sha512 || 
exit 1 ; done )
 
-# Please check the size of the files in the `pulsar-2.X.0-candidate-1`.
-# If you build the artifacts without a clean workspace, the 
`apache-pulsar-2.X.0-src.tar.gz` files
-# may be too large to be unable to upload.
+# don't commit and upload yet, there's a separate step for handling that
+```
+
+### Validate the release files
+
+Then use instructions in [verifying release 
candidates](validate-release-candidate.md) page to do some sanity checks on the 
produced binary distributions.
+
+ Make sure to run Apache RAT to verify the license headers in the src package:
+```shell
+cd /tmp
+tar -xvzf 
~/pulsar-svn-release-$VERSION_RC/pulsar-$VERSION_RC/apache-pulsar-*-src.tar.gz
+cd apache-pulsar-$VERSION_WITHOUT_RC-src
+mvn apache-rat:check
+```
+
+### Commit and upload the staged files in the local SVN directory to ASF SVN 
server
+
+```shell
+cd  ~/pulsar-svn-release-$VERSION_RC/pulsar-$VERSION_RC
 svn add *
 svn ci -m "Staging artifacts and signature for Pulsar release $VERSION_RC"
 ```
@@ -368,7 +386,25 @@ DOCKER_USER= 
DOCKER_PASSWORD= DOCKER_ORG=
-DOCKER_ORG=
 docker login -u $DOCKER_USER
 mvn install 

Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


heesung-sn commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543682634


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   This `return null` does not return a null element to `closeFutures`. I think 
it will add CompletableFuture(null).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Add a metric for geo replication for tracking replicated subscriptions snapshot timeouts [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on issue #21793:
URL: https://github.com/apache/pulsar/issues/21793#issuecomment-2026074347

   > @lhotari I have made a [PR](https://github.com/nikam14/pulsar/pull/3) in 
forked repo can you take a look.
   
   @nikam14 looks good. Please go ahead and create a apache/pulsar PR. Please 
fill in the details in the description and name the PR properly too. The 
[contribution guide](https://pulsar.apache.org/contribute/) contains advice 
unless the PR template explains it. For metrics, there will also need to be 
documentation to be added to pulsar-site repository.
   You can usually get help also on [Apache Pulsar 
Slack's](https://pulsar.apache.org/community/#section-discussions) #dev channel 
for anything related to contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r68192 [2/2] - in /dev/pulsar/pulsar-3.2.2-candidate-1: ./ connectors/

2024-03-28 Thread lhotari
Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.asc
==
--- 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.asc
 (added)
+++ 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.asc
 Thu Mar 28 20:35:44 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYF0wIACgkQ0/pn1SLF
+UlYz9A/9HULfgH6bXaLj91QJlmbemhnMBkzEM4dGFmcATzv+vRmmh8y2vKHlSNxS
+vbk5Y8pNJGo3a8Bblay4DjhGhOsATB81nVPEYH22np4DP4gb4AhZ6Vp9JkegaBPV
+5PcswVh9uzI2e9006kpedPZZnMy7wNHZOfpgyXsjdPcOajbB+pKFFDGxWmbIsgWB
+GDq+p6a5dmcbMlHDx+d6U25QMHRUhPuZvzwjOmZF6t/EVcgsvGy1Y//J10XVKhbT
+73M/X8nVZwEZ6BkGtgj0emXKUO8zwMuCU0p9AyS9akK9f4gQ3aWiueyRp5+ytpSa
+2TSIgeRkZXG9vI3xBxCnGCFI3Q450yoyP8DYEp2uR2OXMl71YARGPkubkR1MEe3x
+0D25G28zUHjPV4wIk/dIg1klg2iDknB6qpUb4CCzFQZoaH4Tymn7j24yjkz96ca0
+gD+VYzoiY2TCnrP1a80oP2Th4YO4UJgv0JIygph5BvBwXmo9Ic4arWL2Fi8BrK1k
+rYNIk7DpD+6G6CVfGShFKYvS2Ll5TM6x8cyNlzh9ETlA02Xbz/oK5IF7Tf4LjCYd
+T3RH7fcYlrO/z8MNhzvt3C3er78OzZmZbJZAeQBHES4+zXXsVqngi6rqtz24KJmN
+tApldCWFJCAq+UI4qobsO+Mp4gnZXNS93TzXIt/56mGJRgF9Huc=
+=YbPn
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.sha512
==
--- 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.sha512
 Thu Mar 28 20:35:44 2024
@@ -0,0 +1 @@
+1b8fbd8b1a3f42d40dbf0216e0c59db6bd3a1e3fa6c223d5dc45272d1d7a61ac33b64535ce6c819c588c9be83724009180151392ad093490b16d799a5d456084
  ./connectors/pulsar-io-jdbc-sqlite-3.2.2.nar

Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.asc
==
--- 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.asc 
Thu Mar 28 20:35:44 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYF0vwACgkQ0/pn1SLF
+UlaBvBAAgxmF+uuKUZlu6OV+fTaB/QsA/R3A9tLQtumdQTRBFJwgNBtu0fdhtVpd
+IOtf/FtyTz7W+E/UL+E9kqhZ0g5KyFOHkF/8wkv8eDIDyFl1zQkrT6LG4XWQoQ2G
+Wx1dQeb6wn1r45goj+aXr552TQVQ4NnWbN+FSdNaViWK0Z7IyXH/I7DvGRal/o2o
+B1VCIdYZBzfXpkNm4dHgxExr74XJnQ5DZD5ydkHGYR5Jl2U+YCpGKTcqqy9PdGNS
+fV1jeW49vdavIXI1Fhe+7kwnoQt3dg+4wesRFGukn449syyGK8v7tc0WxN1i8xZu
+7EEhNK5H05RwIG3lFwzlxKFA+pdrZjCIvHmaXjFfHs2QEgqDy/m79KW3Y4nWFRRf
+J97iHUkbXxnQ5LPGwDM7/vw1SR+mxLg67QFZsSZCNDkODSSItP4iKI0HvcbHZLHr
+UxRgTYXysbPLqJLFjUWksoZggSlt8VRei+CL1RzQQPn6r+MHhiPhUTMk6/J4gsvY
+DJAKZHocCSUQ8qNW/PKZ55cvHzRnOfCtGf6jjR0EfSMJv29Sw0Hw4cT597MqCRNk
+13c92KEW4wp6E1nDnD/spW0k9CKmDJTPTDQtqQ7iF661GqX8UfPs/rkORnJ/rVpt
+GU+Hn3ArVLe6UcJDb/F4S4zKMOBz8QkXO+32rwEzuEqtGp0vYcE=
+=dcd/
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.sha512
==
--- 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.sha512 
(added)
+++ 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.sha512 
Thu Mar 28 20:35:44 2024
@@ -0,0 +1 @@
+92117b86a36d769e3b04871e572c12e8de186ba7ee1b7a06d011f8507eec8da6c9587a51c22460c353cd93efefb9036d55f301a66a13ef72df80437d8ad9acd7
  ./connectors/pulsar-io-kafka-3.2.2.nar

Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.2.2.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.2.2.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.2.2.nar.asc

svn commit: r68192 [1/2] - in /dev/pulsar/pulsar-3.2.2-candidate-1: ./ connectors/

2024-03-28 Thread lhotari
Author: lhotari
Date: Thu Mar 28 20:35:44 2024
New Revision: 68192

Log:
Staging artifacts and signature for Pulsar release 3.2.2-candidate-1

Added:
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-bin.tar.gz   (with 
props)
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-bin.tar.gz.asc
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-bin.tar.gz.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-src.tar.gz   (with 
props)
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-src.tar.gz.asc
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-src.tar.gz.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-offloaders-3.2.2-bin.tar.gz   
(with props)

dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-offloaders-3.2.2-bin.tar.gz.asc

dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-offloaders-3.2.2-bin.tar.gz.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.tar.gz   
(with props)
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.tar.gz.asc

dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.tar.gz.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.zip   
(with props)
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.zip.asc
dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.zip.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/LICENSE
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/README

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-aerospike-3.2.2.nar   
(with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-aerospike-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-aerospike-3.2.2.nar.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-alluxio-3.2.2.nar  
 (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-alluxio-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-alluxio-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-batch-data-generator-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-batch-data-generator-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-batch-data-generator-3.2.2.nar.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-canal-3.2.2.nar   
(with props)
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-canal-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-canal-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-cassandra-3.2.2.nar   
(with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-cassandra-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-cassandra-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-data-generator-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-data-generator-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-data-generator-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mongodb-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mongodb-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mongodb-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mssql-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mssql-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mssql-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mysql-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mysql-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mysql-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-oracle-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-oracle-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-oracle-3.2.2.nar.sha512

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-postgres-3.2.2.nar
   (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-postgres-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-postgres-3.2.2.nar.sha512
dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-dynamodb-3.2.2.nar 
  (with props)

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-dynamodb-3.2.2.nar.asc

dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-dynamodb-3.2.2.nar.sha512


Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance

2024-03-28 Thread via GitHub


dragosvictor commented on code in PR #22379:
URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543614920


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##
@@ -2244,9 +2244,18 @@ private CompletableFuture 
unloadServiceUnit(NamespaceBundle serviceUnit
 closeFutures.add(topicFuture
 .thenCompose(t -> t.isPresent() ? t.get().close(
 disconnectClients, 
closeWithoutWaitingClientDisconnect)
-: CompletableFuture.completedFuture(null)));
+: CompletableFuture.completedFuture(null))
+.exceptionally(e -> {
+if (e.getCause() instanceof 
BrokerServiceException.ServiceUnitNotReadyException
+&& e.getMessage().contains("Please redo 
the lookup")) {
+log.warn("[{}] Topic ownership check failed. 
Skipping it", topicName);
+return null;

Review Comment:
   Since we're adding a null element to `closeFutures`, would this cause an 
exception further down in `FutureUtil.waitForAll`? The 
[documentation](https://docs.oracle.com/en%2Fjava%2Fjavase%2F11%2Fdocs%2Fapi%2F%2F/java.base/java/util/concurrent/CompletableFuture.html#allOf(java.util.concurrent.CompletableFuture...))
 suggests so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r68191 - /dev/pulsar/pulsar-3.2.2-candidate-1/

2024-03-28 Thread lhotari
Author: lhotari
Date: Thu Mar 28 20:28:25 2024
New Revision: 68191

Log:
Add directory for pulsar 3.2.2-candidate-1 release

Added:
dev/pulsar/pulsar-3.2.2-candidate-1/



Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


poorbarcode commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543568051


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -437,6 +436,82 @@ protected synchronized NavigableSet 
getMessagesToReplayNow(int max
 }
 }
 
+private int getAvailablePermits(Consumer c) {
+int availablePermits = Math.max(c.getAvailablePermits(), 0);
+if (c.getMaxUnackedMessages() > 0) {
+// Avoid negative number
+int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - 
c.getUnackedMessages(), 0);
+availablePermits = Math.min(availablePermits, 
remainUnAckedMessages);
+}
+return availablePermits;
+}
+
+@Override
+protected NavigableSet 
filterOutEntriesWillBeDiscarded(NavigableSet src) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] SnapshotSegmentAbortedTxnProcessorImpl contains thread safety issues [pulsar]

2024-03-28 Thread via GitHub


nikam14 commented on issue #22116:
URL: https://github.com/apache/pulsar/issues/22116#issuecomment-2025994480

   @lhotari can you review above mentioned PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Add a metric for geo replication for tracking replicated subscriptions snapshot timeouts [pulsar]

2024-03-28 Thread via GitHub


nikam14 commented on issue #21793:
URL: https://github.com/apache/pulsar/issues/21793#issuecomment-2025992677

   @lhotari I have made a  [PR](https://github.com/nikam14/pulsar/pull/3) in 
forked repo can you take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) annotated tag v3.2.2-candidate-1 updated (bacedb5bd6b -> add6ec19180)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to annotated tag v3.2.2-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v3.2.2-candidate-1 was modified! ***

from bacedb5bd6b (commit)
  to add6ec19180 (tag)
 tagging bacedb5bd6b27e53cb3036976bec27340089b179 (commit)
 replaces v3.2.1
  by Lari Hotari
  on Thu Mar 28 21:45:57 2024 +0200

- Log -
Release 3.2.2-candidate-1
-BEGIN PGP SIGNATURE-

iQJHBAABCgAxFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFyPUTHGxob3RhcmlA
YXBhY2hlLm9yZwAKCRDT+mfVIsVSVr4VEACulxLPDEqiywkloHcd0hO3+mbIlXrB
cdAiO97tbpE6Xzx2k4dGAb6s/umlzpy7tAYOBTEsa2xMmsC2kfED17orUgsi277M
Oa+QYEvidq4n8dUzJv3VVIeuBhScSvKXb1Qv7Xgw4ltCt3gjxfu8fBUOwDXXTvS/
mQ5/im4XGpZkR1AuPQ4c0PTB0ebXQbbUruSaIB8cU5wLbS+V9/ka0heFeH1x7+Pe
wNqhlcdTRiD7jKf1iRIR3rn2Vhw33R5Cw9NHL+pafLvwRtaRsrZmLBc/ME6JCkV0
T4c+6sjPexpYmU9m9AyO9wS/75lMEpIYlkhUeZtTxwLftdMB9mC+72V6qgIzr/c6
uJoVRW72L1AbygjtLQs9zcyCaCyOVgJqgEYHAXuQ2C002ZP68KQ1HHnTvlJ19PKw
TkJiJbHtE9z+8Ab2anuHCetTtI+hXI9Niaybar2QXwh2Soy8+qVZw7BMrdJt60AC
Pg5uFPuGDgOvumxnBwi4hEqDT46olnW8oJLbbJ/IC8ti3gT/lFu82Eru0GVda9XP
IJpV9g/sQp3KLFu6eaCybyWaD7FY9gjqHqDFIN0IeTuz/7pSsKc1OKzam5ePZ0e9
wi1Uge1/Jf1ZPeqYDDgt5muxifmuhpZBwqFrtC/AwaxajBWt7gYCNJ5/wCX4+vR1
yNhiTxB9mGbqMQ==
=t9kS
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(pulsar) branch branch-3.2 updated (4847648e78d -> bacedb5bd6b)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 4847648e78d [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
 add bacedb5bd6b Release 3.2.2

No new revisions were added by this update.

Summary of changes:
 bouncy-castle/bc/pom.xml  | 2 +-
 bouncy-castle/bcfips-include-test/pom.xml | 2 +-
 bouncy-castle/bcfips/pom.xml  | 2 +-
 bouncy-castle/pom.xml | 2 +-
 buildtools/pom.xml| 4 ++--
 distribution/io/pom.xml   | 2 +-
 distribution/offloaders/pom.xml   | 2 +-
 distribution/pom.xml  | 2 +-
 distribution/server/pom.xml   | 2 +-
 distribution/shell/pom.xml| 2 +-
 docker/pom.xml| 2 +-
 docker/pulsar-all/pom.xml | 2 +-
 docker/pulsar/pom.xml | 2 +-
 jclouds-shaded/pom.xml| 2 +-
 managed-ledger/pom.xml| 2 +-
 microbench/pom.xml| 2 +-
 pom.xml   | 4 ++--
 pulsar-bom/pom.xml| 4 ++--
 pulsar-broker-auth-athenz/pom.xml | 2 +-
 pulsar-broker-auth-oidc/pom.xml   | 2 +-
 pulsar-broker-auth-sasl/pom.xml   | 2 +-
 pulsar-broker-common/pom.xml  | 2 +-
 pulsar-broker/pom.xml | 2 +-
 pulsar-cli-utils/pom.xml  | 2 +-
 pulsar-client-1x-base/pom.xml | 2 +-
 pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +-
 pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +-
 pulsar-client-admin-api/pom.xml   | 2 +-
 pulsar-client-admin-shaded/pom.xml| 2 +-
 pulsar-client-admin/pom.xml   | 2 +-
 pulsar-client-all/pom.xml | 2 +-
 pulsar-client-api/pom.xml | 2 +-
 pulsar-client-auth-athenz/pom.xml | 2 +-
 pulsar-client-auth-sasl/pom.xml   | 2 +-
 pulsar-client-messagecrypto-bc/pom.xml| 2 +-
 pulsar-client-shaded/pom.xml  | 2 +-
 pulsar-client-tools-api/pom.xml   | 2 +-
 pulsar-client-tools-customcommand-example/pom.xml | 2 +-
 pulsar-client-tools-test/pom.xml  | 2 +-
 pulsar-client-tools/pom.xml   | 2 +-
 pulsar-client/pom.xml | 2 +-
 pulsar-common/pom.xml | 2 +-
 pulsar-config-validation/pom.xml  | 2 +-
 pulsar-docs-tools/pom.xml | 2 +-
 pulsar-functions/api-java/pom.xml | 2 +-
 pulsar-functions/instance/pom.xml | 2 +-
 pulsar-functions/java-examples-builtin/pom.xml| 2 +-
 pulsar-functions/java-examples/pom.xml| 2 +-
 pulsar-functions/localrun-shaded/pom.xml  | 2 +-
 pulsar-functions/localrun/pom.xml | 2 +-
 pulsar-functions/pom.xml  | 2 +-
 pulsar-functions/proto/pom.xml| 2 +-
 pulsar-functions/runtime-all/pom.xml  | 2 +-
 pulsar-functions/runtime/pom.xml  | 2 +-
 pulsar-functions/secrets/pom.xml  | 2 +-
 pulsar-functions/utils/pom.xml| 2 +-
 pulsar-functions/worker/pom.xml   | 2 +-
 pulsar-io/aerospike/pom.xml   | 2 +-
 pulsar-io/alluxio/pom.xml | 2 +-
 pulsar-io/aws/pom.xml | 2 +-
 pulsar-io/batch-data-generator/pom.xml| 2 +-
 pulsar-io/batch-discovery-triggerers/pom.xml  | 2 +-
 pulsar-io/canal/pom.xml   | 2 +-
 pulsar-io/cassandra/pom.xml   | 2 +-
 pulsar-io/common/pom.xml  | 2 +-
 pulsar-io/core/pom.xml| 2 +-
 pulsar-io/data-generator/pom.xml  | 2 +-
 pulsar-io/debezium/core/pom.xml   | 2 +-
 pulsar-io/debezium/mongodb/pom.xml| 2 +-
 pulsar-io/debezium/mssql/pom.xml  | 2 +-
 pulsar-io/debezium/mysql/pom.xml  | 2 +-
 pulsar-io/debezium/oracle/pom.xml | 2 +-
 pulsar-io/debezium/pom.xml| 2 +-
 pulsar-io/debezium/postgres/pom.xml   | 2 +-
 

Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543543748


##
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java:
##
@@ -54,13 +55,23 @@ private PulsarAdminBuilderImpl(ClientConfigurationData 
conf) {
 
 @Override
 public PulsarAdminBuilder clone() {
-return new PulsarAdminBuilderImpl(conf.clone());
+PulsarAdminBuilderImpl pulsarAdminBuilder = new 
PulsarAdminBuilderImpl(conf.clone());
+pulsarAdminBuilder.clientBuilderClassLoader = clientBuilderClassLoader;
+pulsarAdminBuilder.gzipCompressionEnabled = gzipCompressionEnabled;
+return pulsarAdminBuilder;
 }
 
 @Override
 public PulsarAdminBuilder loadConf(Map config) {
 conf = ConfigurationDataUtils.loadData(config, conf, 
ClientConfigurationData.class);
 setAuthenticationFromPropsIfAvailable(conf);
+if (config.containsKey("gzipCompressionEnabled")) {
+if (config.get("gzipCompressionEnabled") instanceof Boolean) {

Review Comment:
   I agree, that would make sense. This PR is still in draft mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543543178


##
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java:
##
@@ -54,13 +55,23 @@ private PulsarAdminBuilderImpl(ClientConfigurationData 
conf) {
 
 @Override
 public PulsarAdminBuilder clone() {
-return new PulsarAdminBuilderImpl(conf.clone());
+PulsarAdminBuilderImpl pulsarAdminBuilder = new 
PulsarAdminBuilderImpl(conf.clone());

Review Comment:
   that is to implement the cloning. the clone method should return a similar 
object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543540449


##
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java:
##
@@ -327,4 +327,11 @@ PulsarAdminBuilder authentication(String 
authPluginClassName, Map

Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543540449


##
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java:
##
@@ -327,4 +327,11 @@ PulsarAdminBuilder authentication(String 
authPluginClassName, Map

Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543538151


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java:
##
@@ -294,8 +291,10 @@ public void start() throws PulsarServerException {
 ContextHandlerCollection contexts = new ContextHandlerCollection();
 contexts.setHandlers(handlers.toArray(new 
Handler[handlers.size()]));
 
+Handler handlerForContexts = 
GzipHandlerUtil.wrapWithGzipHandler(contexts,

Review Comment:
   I agree that it is confusing. Refactorings are a different story.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543537005


##
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.util.List;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+
+public class GzipHandlerUtil {
+public static Handler wrapWithGzipHandler(Handler innerHandler, 
List gzipCompressionExcludedPaths) {
+Handler wrappedHandler;
+if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) 
{

Review Comment:
   > can't we just define gzip on the handlers we see fit?
   
   Which handlers would you exclude? I think that it's useful to enable it 
globally since the current assumption is that all usages will benefit from gzip 
compression.
   
   > Too much configuration option can be confusing. They can control the gzip 
by using headers, no?
   
   That's true, but there's risk that regressions are caused when changes are 
made. Adding the `httpServerGzipCompressionExcludedPaths` is a way to ensure 
that users aren't stuck if there's a regression for some corner case.
   There's a mailing list discussion 
https://lists.apache.org/thread/xbs1f5v7865fz1hm1vxqlw6o9dgy3d9z .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) branch main updated: Improve release vote email template

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new a848d0a1822a Improve release vote email template
a848d0a1822a is described below

commit a848d0a1822ab2b4aac2a483fa0ddd7a3018b8be
Author: Lari Hotari 
AuthorDate: Thu Mar 28 21:13:41 2024 +0200

Improve release vote email template
---
 contribute/release-process.md | 30 --
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git a/contribute/release-process.md b/contribute/release-process.md
index 54f3911017db..8413025ec1bd 100644
--- a/contribute/release-process.md
+++ b/contribute/release-process.md
@@ -412,24 +412,41 @@ PULSAR_IMAGE_URL=""
 PULSAR_ALL_IMAGE_URL=""
 ```
 
+Set also these
+```shell
+PULSAR_IMAGE_NAME="$DOCKER_USER/pulsar:$VERSION_WITHOUT_RC-$(git rev-parse 
--short=7 v$VERSION_RC^{commit})"
+PULSAR_ALL_IMAGE_NAME="$DOCKER_USER/pulsar-all:$VERSION_WITHOUT_RC-$(git 
rev-parse --short=7 v$VERSION_RC^{commit})"
+# validate pulling, will take some time, you can skip this if you have a slow 
internet connection
+docker pull $PULSAR_IMAGE_NAME
+docker pull $PULSAR_ALL_IMAGE_NAME
+# check that images are about right, you can skip this if you have a slow 
internet connection
+docker run --rm  $PULSAR_IMAGE_NAME bash -c 'ls /pulsar/lib'  |less
+docker run --rm  $PULSAR_ALL_IMAGE_NAME bash -c 'ls /pulsar/lib'  |less
+```
+
 Now you can render the template to the clipboard
 ```shell
 tee >(pbcopy) 

Re: [PR] [feat][ci] Add Trivy container scan Github workflow [pulsar]

2024-03-28 Thread via GitHub


merlimat merged PR #22063:
URL: https://github.com/apache/pulsar/pull/22063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated (6b2938223cf -> 8fc30df37e2)

2024-03-28 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 6b2938223cf [improve] PIP-342: OTel client metrics support (#22179)
 add 8fc30df37e2 [feat][ci] Add Trivy container scan Github workflow 
(#22063)

No new revisions were added by this update.

Summary of changes:
 .github/workflows/ci-trivy-container-scan.yaml | 66 ++
 1 file changed, 66 insertions(+)
 create mode 100644 .github/workflows/ci-trivy-container-scan.yaml



[PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer [p

2024-03-28 Thread via GitHub


heesung-sn opened a new pull request, #22379:
URL: https://github.com/apache/pulsar/pull/22379

   
   
   
   
   
   Fixes https://github.com/apache/pulsar/issues/20157
   
   
   
   ### Motivation
   
   https://github.com/apache/pulsar/issues/20157 found that the 
ExtensibleLoadManagerImplTest init step, unloading the test namespace often 
fails.
   
   
   
   ### Modifications
   
   
   
   - Skip topic.close during unloading if the topic future fails with ownership 
check. If the unloading(not transfer) happens in the middle of topic creation, 
the topic creation might fail because of the ownership check failure. In this 
case, we need to skip unloading that topic, not to fail the unload command. 
   - Fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for 
ExtensibleLoadBalancer. Currently, isBundleOwnedByAnyBroker is unnecessarily 
selecting a new owner broker, if no owner is found. We should simply check  
ownership, instead.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.initializeState [pulsar]

2024-03-28 Thread via GitHub


heesung-sn commented on issue #20157:
URL: https://github.com/apache/pulsar/issues/20157#issuecomment-2025894426

   hi, I raised another bug fix here. 
https://github.com/apache/pulsar/pull/22379


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar-site) 02/02: Add link to validate_pulsar_release.sh script

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git

commit b8ce7b8574e5aab634a14043ff82fff94636d903
Author: Lari Hotari 
AuthorDate: Thu Mar 28 20:38:36 2024 +0200

Add link to validate_pulsar_release.sh script
---
 contribute/validate-release-candidate.md | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/contribute/validate-release-candidate.md 
b/contribute/validate-release-candidate.md
index 38d497999b5c..19e8b7b993c8 100644
--- a/contribute/validate-release-candidate.md
+++ b/contribute/validate-release-candidate.md
@@ -7,6 +7,13 @@ This page contains manual instructions for reviewing and 
verifying a release can
 
 ## Validate the binary distribution
 
+:::note
+
+There's a bash script 
[validate_pulsar_release.sh](https://github.com/lhotari/pulsar-contributor-toolbox/blob/master/scripts/validate_pulsar_release.sh)
 available for assisting in semi-automated validation for the following steps.
+
+:::
+
+
 ### Download And Verify the binary distributions
 
 Download the server distribution `apache-pulsar--bin.tar.gz` and 
extract it. The extracted files are in a directory called 
`apache-pulsar-`. All the operations below happen within that 
directory:



(pulsar-site) branch main updated (a1f23abf632b -> b8ce7b8574e5)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


from a1f23abf632b Update release process with use of variables to ease 
copy-pasting
 new 71e9dc729904 Improve release process docs
 new b8ce7b8574e5 Add link to validate_pulsar_release.sh script

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 contribute/release-process.md| 109 +--
 contribute/validate-release-candidate.md |   7 ++
 2 files changed, 95 insertions(+), 21 deletions(-)



(pulsar-site) 01/02: Improve release process docs

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git

commit 71e9dc729904cc102a5a6588a1cc565a76b679d1
Author: Lari Hotari 
AuthorDate: Thu Mar 28 17:10:49 2024 +0200

Improve release process docs
---
 contribute/release-process.md | 109 ++
 1 file changed, 88 insertions(+), 21 deletions(-)

diff --git a/contribute/release-process.md b/contribute/release-process.md
index cff4d7704c6f..54f3911017db 100644
--- a/contribute/release-process.md
+++ b/contribute/release-process.md
@@ -63,6 +63,16 @@ If you run into problems with GPG signing set this
 export GPG_TTY=$(tty)
 ```
 
+For some commands, a template is copied to the clipboard using `pbcopy`. 
+This is already available on MacOS. For Linux, create a shell alias:
+
+```shell
+# Linux only
+# install xsel if it is missing
+sudo apt install xsel
+# create alias pbcopy for copying stdin to clipboard
+alias pbcopy="xsel --clipboard --input"
+```
 ## Create a release candidate
 
 ### Create the release branch
@@ -89,7 +99,7 @@ Alternatively, you can use a git workspace to create a new, 
clean directory on y
 
 ```shell
 git worktree add ../pulsar-release-$VERSION_BRANCH $VERSION_BRANCH
-cd pulsar-release-$VERSION_BRANCH
+cd ../pulsar-release-$VERSION_BRANCH
 export PULSAR_PATH=$(pwd)
 ```
 
@@ -98,6 +108,11 @@ if you get an error that the branch is already checked out, 
go to that directory
 git checkout --detach HEAD
 ```
 
+After the release, you can cleanup the worktree in the main repository 
directory
+```shell
+git worktree remove ../pulsar-release-$VERSION_BRANCH
+```
+
 If you created a new branch, update the [CI - OWASP Dependency 
Check](https://github.com/apache/pulsar/blob/master/.github/workflows/ci-owasp-dependency-check.yaml)
 workflow so that it will run on the new branch.
 
 Note that you should also stop the workflow for previous Pulsar versions that 
are EOL.
@@ -185,6 +200,16 @@ git push $UPSTREAM_REMOTE $VERSION_BRANCH
 git push $UPSTREAM_REMOTE v$VERSION_RC
 ```
 
+If there's a need to restart the release with more commits, you can delete the 
tag.
+
+```shell
+# only if you restart the release before it has been published for voting. 
Don't run this after that!
+# delete local tag
+git tag -d v$VERSION_RC
+# delete tag in remote
+git push $UPSTREAM_REMOTE :v$VERSION_RC
+```
+
 For patch releases, the tag is like `2.3.1`.
 
 ### Build release artifacts
@@ -253,7 +278,6 @@ KEY_ID=$(gpg --list-keys --with-colons 
$apache_u...@apache.org | egrep "^pub" |
 echo "default-key $KEY_ID" >> ~/.gnupg/gpg.conf
 ```
 
-
 ```shell
 # make sure to run svn mkdir commmand in a different dir(NOT IN $PULSAR_PATH).
 mkdir ~/pulsar-svn-release-$VERSION_RC
@@ -308,7 +332,17 @@ Log in to the ASF Nexus repository at 
https://repository.apache.org
 
 Click on "Staging Repositories" on the left sidebar and then select the 
current Pulsar staging repo. This should be called something like 
`orgapachepulsar-XYZ`.
 
-Use the "Close" button to close the repository. This operation will take few 
minutes. Once complete click "Refresh" and now a link to the staging repository 
should be available, something like 
https://repository.apache.org/content/repositories/orgapachepulsar-XYZ
+Add a version string such as "Apache Pulsar 3.0.4-candidate-1" to the 
clipboard with this command:
+
+```shell
+printf "Apache Pulsar $VERSION_RC" |pbcopy
+```
+
+Use the "Close" button to close the repository.
+
+Enter the version string in the description field before clicking "Confirm".
+
+This operation will take few minutes. Once complete click "Refresh" and now a 
link to the staging repository should be available, something like 
https://repository.apache.org/content/repositories/orgapachepulsar-XYZ
 
 ### Stage Docker images
 
@@ -337,31 +371,57 @@ DOCKER_USER= 
DOCKER_PASSWORD= DOCKER_ORG=
-DOCKER_ORG=
-docker login $DOCKER_ORG -u $DOCKER_USER -p 
+DOCKER_USER=
+DOCKER_ORG=
+docker login -u $DOCKER_USER
 mvn install -DUBUNTU_MIRROR=http://azure.archive.ubuntu.com/ubuntu/ \
 -DskipTests \
+-Dmaven.gitcommitid.nativegit=true \
 -Pmain,docker -Pdocker-push \
 -Ddocker.platforms=linux/amd64,linux/arm64 \
 -Ddocker.organization=$DOCKER_ORG \
 -pl docker/pulsar,docker/pulsar-all
 ```
 
-
 ## Vote for the release candidate
 
-Start a voting thread on the dev mailing list. Here is a sample content:
+Start a voting thread on the dev mailing list. 
+
+Here is a way to render the template for the voting email.
+
+Set these shell variables
+```shell
+DOCKER_USER=
+STAGING_REPO="https://repository.apache.org/#stagingRepositories>"
+MY_NAME="Firstname Lastname"
+PREVIOUS_VERSION_WITHOUT_RC="3.0.3"
+```
+
+```shell
+echo "Go to https://hub.docker.com/r/$DOCKER_USER/pulsar/tags to find the 
layer URL for the pulsar image"
+echo "Go to https://hub.docker.com/r/$DOCKER_USER/pulsar-all/tags to find the 
layer URL 

Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


poorbarcode commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543416514


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {

Review Comment:
   Oh, it is a counter to record replay readings. It increases when the reading 
is not `Normal`.
   
   ```
   if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
   replyReadCounter.incrementAndGet();
   }
   ```



##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {

Review Comment:
   Oh, it is a counter to record replay readings. It increases when the reading 
is not `Normal`.
   
   ```java
   if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
   replyReadCounter.incrementAndGet();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


poorbarcode commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543416791


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {
+PersistentTopic persistentTopic =
+(PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.openCursor(cursorName);
+managedLedger.getCursors().removeCursor(cursor.getName());
+managedLedger.getActiveCursors().removeCursor(cursor.getName());
+ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST);
+AtomicInteger replyReadCounter = new AtomicInteger();
+Mockito.doAnswer(invocation -> {
+if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+replyReadCounter.incrementAndGet();
+}
+return invocation.callRealMethod();
+}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any());
+Mockito.doAnswer(invocation -> {
+if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+replyReadCounter.incrementAndGet();
+}
+return invocation.callRealMethod();
+}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any(), Mockito.anyBoolean());
+admin.topics().createSubscription(topicName, cursorName, 
MessageId.earliest);
+return replyReadCounter;
+}
+
+@Test
+public void testNoRepeatedReadAndDiscard() throws Exception {
+int delayedMessages = 100;
+final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+final String subName = "my-sub";
+admin.topics().createNonPartitionedTopic(topic);
+AtomicInteger replyReadCounter = injectReplayReadCounter(topic, 
subName);
+
+// Send messages.
+@Cleanup
+Producer producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+for (int i = 0; i < delayedMessages; i++) {
+MessageId messageId = producer.newMessage()
+.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+.value(100 + i)
+.send();
+log.info("Published delayed message :{}", messageId);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



svn commit: r68189 [1/2] - in /dev/pulsar/pulsar-3.0.4-candidate-1: ./ connectors/

2024-03-28 Thread lhotari
Author: lhotari
Date: Thu Mar 28 17:04:01 2024
New Revision: 68189

Log:
Staging artifacts and signature for Pulsar release 3.0.4-candidate-1

Added:
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-bin.tar.gz   (with 
props)
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-bin.tar.gz.asc
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-bin.tar.gz.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-src.tar.gz   (with 
props)
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-src.tar.gz.asc
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-src.tar.gz.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-offloaders-3.0.4-bin.tar.gz   
(with props)

dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-offloaders-3.0.4-bin.tar.gz.asc

dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-offloaders-3.0.4-bin.tar.gz.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.tar.gz   
(with props)
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.tar.gz.asc

dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.tar.gz.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.zip   
(with props)
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.zip.asc
dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.zip.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/LICENSE
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/README

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-aerospike-3.0.4.nar   
(with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-aerospike-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-aerospike-3.0.4.nar.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-alluxio-3.0.4.nar  
 (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-alluxio-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-alluxio-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-batch-data-generator-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-batch-data-generator-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-batch-data-generator-3.0.4.nar.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-canal-3.0.4.nar   
(with props)
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-canal-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-canal-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-cassandra-3.0.4.nar   
(with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-cassandra-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-cassandra-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-data-generator-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-data-generator-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-data-generator-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mongodb-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mongodb-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mongodb-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mssql-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mssql-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mssql-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mysql-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mysql-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mysql-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-oracle-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-oracle-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-oracle-3.0.4.nar.sha512

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-postgres-3.0.4.nar
   (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-postgres-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-postgres-3.0.4.nar.sha512
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-dynamodb-3.0.4.nar 
  (with props)

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-dynamodb-3.0.4.nar.asc

dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-dynamodb-3.0.4.nar.sha512


svn commit: r68189 [2/2] - in /dev/pulsar/pulsar-3.0.4-candidate-1: ./ connectors/

2024-03-28 Thread lhotari
Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.asc
==
--- 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.asc
 (added)
+++ 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.asc
 Thu Mar 28 17:04:01 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFoOgACgkQ0/pn1SLF
+UlaohA/9HgONgfp6E7PQqo3tCdjK2WEkiMlDKrdOlxNiCJDyG1SBCBMjlqoAVmyd
+zgfQlfRI6l1QvipaONboUd1osWU4zVWCS7gBA+SkoITPS9IBTAHKGi6meRdiGQ6v
+2U8yJJ2bdY2UY01Bc+FtHfoIR1NZG3U17GLOMxibVMj0ALd0HoSVV+cCDqu1LmDT
+rUvIZ+beFZZO2lq/WuWcp+SZmLfY+oiXpQTmTFUX/FJOWmUXKBli1OTPAea/94Jz
+C59DI982jEo+q/z6EC6avJqCjnf4rb4vHOmmtipRre9VJZWMi7aLFMCbS3bE3odX
+iqleOUAtcChwgfB13N6CVePFWYdTzQZ+F6fCSfz9GLB7fFWNCG4Y7sLZrCOKSndG
+yBpw/ZEe43F6zJEEc+1Vs6CHGV3IVjy0EtxUOcTYv+GQqWQ5LK3wZh8eyeBxvAIJ
+v8tvrkDSfHR/D3NFwSwjfBakYPwUv0tXCh1kVcvE/1eNfQeTAHyDvR5zkuavygre
+t4EQvWMZNxxdaLim4FVzIMTzvb8ioGwIF4twhoXjaIm1Y6TIJEvoaQIvFStfrqWY
+TvntAMP1jBDPGQXMWUjzYSrw4RRu1gpDgGZjEh8fAV3AGzwEbHpDWjAKZCAq/bwJ
+IQMRKBzjNaFEvpwgWFPvZ21bNSlab+q25M8xfjsFoq9XCknF/WQ=
+=llw4
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.sha512
==
--- 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.sha512
 (added)
+++ 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.sha512
 Thu Mar 28 17:04:01 2024
@@ -0,0 +1 @@
+67048ff6031ddd5086116aa4341663270e9e07ffb89baa38dfcf9342681188ee8d94d5f593b3f5b3c33b33a972d254f99efd6539729ef392fe3964d7f26888b6
  ./connectors/pulsar-io-jdbc-sqlite-3.0.4.nar

Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.asc
==
--- 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.asc 
(added)
+++ 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.asc 
Thu Mar 28 17:04:01 2024
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFoOMACgkQ0/pn1SLF
+UlawYw//bhR2kbZ6ozMsRYmwA7PWHKANy594GhEMlt7QVJTKGAQ1J7bohsnr2xR1
+2ZCPbuJ6AY4FAVS5fFdlRELX0XkVAKubsRuazSilcGPKTlXBZ+GxoZCCSbDXjAdx
++XhQLW6Pk3tqa81iO5ERJv8WPa/OBdeN76qDTJ0I5P4wtj/xOi6vXZITLFkYO3qO
+08b/dR3itBfQ9oozeCAuEzt6rxCu1MZGyAb3/u7m7dk1ErJ7Jf7MIP2l94hNN1q6
+eyft+hjVpyNmoLGtqzzD9ThwnKIg8k3VSh59UWOCLvde/VRGxALd7qH2EzwdLcYo
+emA/YLCm1VilpBCdooznvDWue7mJxn/0Y4h5Fxkdv+O16pSB/CnFGrcDCmM8V88V
+InsEXLgmtdzCW+ystT5SxVYNJ6jkpBuGtmVy0G5Y0vhrGoykFNfvF/zaz4aEQTYD
+cokTxBcPPh8Ec6vU0iHCzcYyX7cEr6YDDDNo86yqs34iPdhOSNiHngZRMxo25iaB
+BOAb00wN4t38AowgETd2x4uTf6AHrqCmwLSzQUALx0TJSa1lwrHSAa+hu0YTrnEb
+Kreotjvzpv+2Yw7RhpX4joy1gBVu7qJgWMl9FKqmkvvignALSk4fNPb1cMkAueeO
+p45CjFd86vLSUA1HuqkiyDAbmVcshIy/2WNm98nPiRd/F9Oq9Ao=
+=ysvU
+-END PGP SIGNATURE-

Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.sha512
==
--- 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.sha512 
(added)
+++ 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.sha512 
Thu Mar 28 17:04:01 2024
@@ -0,0 +1 @@
+786fa9c075325397fb6d0888554a00e13c1a2be913fd57b50ddf9d5f7c26dde8016dd57f2ce86ef66a7cd662558f5e35d565538b39b2fd4a9ab893fc3e162b5a
  ./connectors/pulsar-io-kafka-3.0.4.nar

Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.0.4.nar
==
Binary file - no diff available.

Propchange: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.0.4.nar
--
svn:mime-type = application/octet-stream

Added: 
dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.0.4.nar.asc

Re: [PR] [improve] PIP-342: OTel client metrics support [pulsar]

2024-03-28 Thread via GitHub


merlimat merged PR #22179:
URL: https://github.com/apache/pulsar/pull/22179


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-2.10 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
 new c44cacde66e [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
c44cacde66e is described below

commit c44cacde66e22b9e7a8c539267b0aeca6d6d426e
Author: Cong Zhao 
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)

(cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)

# Conflicts:
#   pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
---
 .../apache/pulsar/broker/service/ServerCnx.java|  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 35 +-
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 08d84f3e4fa..aafcb5eeb48 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2126,9 +2126,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
 }
 
+final String topic = commandGetSchema.getTopic();
 String schemaName;
 try {
-schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+schemaName = TopicName.get(topic).getSchemaName();
 } catch (Throwable t) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
 return;
@@ -2137,7 +2138,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
 if (schemaAndMetadata == null) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+String.format("Topic not found or no-schema %s", 
topic));
 } else {
 commandSender.sendGetSchemaResponse(requestId,
 SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index c7cbc9b92c5..e6def654fee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
@@ -73,6 +74,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
@@ -101,6 +104,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 @BeforeMethod
 @Override
 public void setup() throws Exception {
+isTcpLookup = true;
 super.internalSetup();
 
 // Setup namespaces
@@ -109,6 +113,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 .allowedClusters(Collections.singleton(CLUSTER_NAME))
 .build();
 admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
 }
 
 @AfterMethod(alwaysRun = true)
@@ -117,6 +122,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
 super.internalCleanup();
 }
 
+@Test
+public void testGetSchemaWithPatternTopic() throws Exception {
+final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+int topicNums = 10;
+for (int i = 0; i < topicNums; i++) {
+String topic = topicPrefix + "-" + 

(pulsar) branch branch-2.11 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
 new ac22f29ccbe [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
ac22f29ccbe is described below

commit ac22f29ccbeb26c9d481ee578d8bf3b232c33bc1
Author: Cong Zhao 
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)

(cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)

# Conflicts:
#   pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
---
 .../apache/pulsar/broker/service/ServerCnx.java|  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 35 +-
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1f7662b4cff..d6b59e0b596 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2210,9 +2210,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
 }
 
+final String topic = commandGetSchema.getTopic();
 String schemaName;
 try {
-schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+schemaName = TopicName.get(topic).getSchemaName();
 } catch (Throwable t) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
 return;
@@ -2221,7 +,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
 if (schemaAndMetadata == null) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+String.format("Topic not found or no-schema %s", 
topic));
 } else {
 commandSender.sendGetSchemaResponse(requestId,
 SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d9254718cb6..648bee38549 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
@@ -97,6 +100,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
 @BeforeMethod
 @Override
 public void setup() throws Exception {
+isTcpLookup = true;
 super.internalSetup();
 
 // Setup namespaces
@@ -105,6 +109,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 .allowedClusters(Collections.singleton(CLUSTER_NAME))
 .build();
 admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
 }
 
 @AfterMethod(alwaysRun = true)
@@ -113,6 +118,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
 super.internalCleanup();
 }
 
+@Test
+public void testGetSchemaWithPatternTopic() throws Exception {
+final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+int topicNums = 10;
+for (int i = 0; i < topicNums; i++) {
+String topic = topicPrefix + "-" + i;
+

(pulsar) branch branch-3.1 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 585fc54f339 [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
585fc54f339 is described below

commit 585fc54f339b78ce1a5edf19a7093355284da0a4
Author: Cong Zhao 
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)

(cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 33 ++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5057b7b045a..29ba7cb866e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2389,9 +2389,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
 }
 
+final String topic = commandGetSchema.getTopic();
 String schemaName;
 try {
-schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+schemaName = TopicName.get(topic).getSchemaName();
 } catch (Throwable t) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
 return;
@@ -2400,7 +2401,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
 if (schemaAndMetadata == null) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+String.format("Topic not found or no-schema %s", 
topic));
 } else {
 commandSender.sendGetSchemaResponse(requestId,
 SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 7eae6462545..e10b45868bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
 @BeforeMethod
 @Override
 public void setup() throws Exception {
+isTcpLookup = true;
 super.internalSetup();
 
 // Setup namespaces
@@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 .allowedClusters(Collections.singleton(CLUSTER_NAME))
 .build();
 admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
 }
 
 @AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
 
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
 }
 
+@Test
+public void testGetSchemaWithPatternTopic() throws Exception {
+final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+int topicNums = 10;
+for (int i = 0; i < topicNums; i++) {
+String topic = topicPrefix + "-" + i;
+admin.topics().createNonPartitionedTopic(topic);

Re: [I] Flaky-test: ShadowManagedLedgerImplTest.testShadowWrites [pulsar]

2024-03-28 Thread via GitHub


Technoboy- commented on issue #22345:
URL: https://github.com/apache/pulsar/issues/22345#issuecomment-2025622773

   The only way seems to increase the timeout for the Awaitility


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][ci] Add Trivy container scan Github workflow [pulsar]

2024-03-28 Thread via GitHub


merlimat commented on code in PR #22063:
URL: https://github.com/apache/pulsar/pull/22063#discussion_r1543253362


##
.github/workflows/ci-trivy-container-scan.yaml:
##
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: CI - Trivy Container Scan
+on:
+  schedule:
+- cron: '0 8 * * *' # Every day at 8am UTC
+  workflow_dispatch:
+inputs:
+  severity:
+description: "Severities to include (comma-separated or 'ALL' to 
include all)"
+required: false
+default: 'CRITICAL,HIGH'
+
+jobs:
+  container_scan:
+if: ${{ github.repository == 'apache/pulsar' }}
+name: Trivy Docker image vulnerability scan
+runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  matrix:
+docker-image:
+  - 'apachepulsar/pulsar'
+  - 'apachepulsar/pulsar-all'
+docker-tag:
+  - '3.2.1'

Review Comment:
   Maybe we should have `latest` here. Unfortunately, we don't have a `nightly` 
tag to test the image from master branch.



##
.github/workflows/ci-trivy-container-scan.yaml:
##
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: CI - Trivy Container Scan
+on:
+  schedule:
+- cron: '0 8 * * *' # Every day at 8am UTC
+  workflow_dispatch:
+inputs:
+  severity:
+description: "Severities to include (comma-separated or 'ALL' to 
include all)"
+required: false
+default: 'CRITICAL,HIGH'
+
+jobs:
+  container_scan:
+if: ${{ github.repository == 'apache/pulsar' }}
+name: Trivy Docker image vulnerability scan
+runs-on: ubuntu-latest
+strategy:
+  fail-fast: false
+  matrix:
+docker-image:
+  - 'apachepulsar/pulsar'
+  - 'apachepulsar/pulsar-all'

Review Comment:
   `pulsar-all` should get dropped in 3.3, so we can already drop it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][ci] Add Trivy container scan Github workflow [pulsar]

2024-03-28 Thread via GitHub


onobc commented on PR #22063:
URL: https://github.com/apache/pulsar/pull/22063#issuecomment-2025594055

   The workflow has been simplified - PTAL @merlimat @lhotari 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [feat][broker][branch-2.11] Implementation of PIP-323: Complete Backlog Quota Telemetry [pulsar]

2024-03-28 Thread via GitHub


liangyepianzhou merged PR #22374:
URL: https://github.com/apache/pulsar/pull/22374


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Replication/topic level [pulsar]

2024-03-28 Thread via GitHub


liangyepianzhou opened a new pull request, #22378:
URL: https://github.com/apache/pulsar/pull/22378

   PIP: https://github.com/apache/pulsar/pull/21648
   
   ### Motivation
   
   Because the configurations of clusters at the namespace level are unclear, 
some flexible topic policies can not work as expected, e.g. geo-replication at 
the topic level. 
   
   ### Modifications
   
   Use `replication-clusters`, `allowed-clusters` to replace a single 
`replication-clusters` originally in the namespace policy.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [ ] `doc-required` 
   - [x] `doc-not-needed` 
   - [ ] `doc-complete` 
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) annotated tag v3.0.4-candidate-1 deleted (was fac66a82c4f)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to annotated tag v3.0.4-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v3.0.4-candidate-1 was deleted! ***

   tag was  fac66a82c4f

The revisions that were on this annotated tag are still contained in
other references; therefore, this change does not discard any commits
from the repository.



(pulsar) annotated tag v3.0.4-candidate-1 updated (5a1fa0c5c67 -> 9dc160153eb)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to annotated tag v3.0.4-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v3.0.4-candidate-1 was modified! ***

from 5a1fa0c5c67 (commit)
  to 9dc160153eb (tag)
 tagging 5a1fa0c5c6709bdb7b003ce12d00abf52da293e1 (commit)
 replaces v3.0.3
  by Lari Hotari
  on Thu Mar 28 17:28:22 2024 +0200

- Log -
Release 3.0.4-candidate-1
-BEGIN PGP SIGNATURE-

iQJHBAABCgAxFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFjJYTHGxob3RhcmlA
YXBhY2hlLm9yZwAKCRDT+mfVIsVSVnf4D/0UHn5R0thMnQEcaRzf35ruuFetf3ZK
bfFaJJ8qCOrdWpsZ8y6woLMBrolRXohWNylPWzu+bYfLIUm3bAnCAVlFEhHI8kHG
ArKRbb9MhSrHuw1eKi7Qgr8/4jiEeq4lzMMJ4a69PBPcBdGE82LUih2PSRNutZrC
cr4kVGlbdEfYcWghnG8j+g5KcYqL4F7U1MmgefE46VoUqGpSceWzRHpC8a/LbLKN
BgyKufCnRLSxZ9fYHDUbS/DsI7mLs31uvgOnrj6HtWNoxNilV3RRiW9rTZ5Wux//
qr+YKOURn8LRZ3D+/p9Of+fFKriXqjZDUGTREkksOlmHTPrG9Cq+a0/cHsO7Kp5N
JvdBD3w8XPjHd3b/38ihvLJrL/PkFDmKn/lmRIv1aqg2sQZeRNL7usPLDCkRiq2f
cXpIHQ/ND6OCtBVHoLvlppEgVtWc3VwqeLOKMm3dtQsV20bJEoV+xip1iaah+MDY
ToQLOKrYp35NloQEPbEXix+CzSKMl8u4PU/XvMRuqfmAicvSrdV7Iao9ptkq/I/4
o9t2LPAzfNBeEuQxHdLKt6F6dguCES93TPe4JT0WB0RKAFUL/cEsthElShDgLYXi
gGL0XzBnyydJHearsO5Z8gHpLeY2SE3/l9IGel+WAdRCAsLbXzy0O2tu6ipp6Rak
Qq3K+V/wv4VfNw==
=W/au
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(pulsar) branch branch-3.2 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 4847648e78d [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
4847648e78d is described below

commit 4847648e78dbf2deb8c7a6cee54e37ad6a6323e7
Author: Cong Zhao 
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)

(cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 33 ++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 78c1c39cfe5..569c5a2fb1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2496,9 +2496,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
 }
 
+final String topic = commandGetSchema.getTopic();
 String schemaName;
 try {
-schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+schemaName = TopicName.get(topic).getSchemaName();
 } catch (Throwable t) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
 return;
@@ -2507,7 +2508,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
 if (schemaAndMetadata == null) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+String.format("Topic not found or no-schema %s", 
topic));
 } else {
 commandSender.sendGetSchemaResponse(requestId,
 SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d4ef041f6de..aa47c378fc3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
 @BeforeMethod
 @Override
 public void setup() throws Exception {
+isTcpLookup = true;
 super.internalSetup();
 
 // Setup namespaces
@@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 .allowedClusters(Collections.singleton(CLUSTER_NAME))
 .build();
 admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
 }
 
 @AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
 
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
 }
 
+@Test
+public void testGetSchemaWithPatternTopic() throws Exception {
+final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+int topicNums = 10;
+for (int i = 0; i < topicNums; i++) {
+String topic = topicPrefix + "-" + i;
+admin.topics().createNonPartitionedTopic(topic);

(pulsar) branch branch-3.0 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 5a1fa0c5c67 [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
5a1fa0c5c67 is described below

commit 5a1fa0c5c6709bdb7b003ce12d00abf52da293e1
Author: Cong Zhao 
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)

(cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 33 ++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 30432aaef73..2b2cbc5fac2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2389,9 +2389,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
 }
 
+final String topic = commandGetSchema.getTopic();
 String schemaName;
 try {
-schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+schemaName = TopicName.get(topic).getSchemaName();
 } catch (Throwable t) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
 return;
@@ -2400,7 +2401,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
 if (schemaAndMetadata == null) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+String.format("Topic not found or no-schema %s", 
topic));
 } else {
 commandSender.sendGetSchemaResponse(requestId,
 SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 7eae6462545..e10b45868bf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
 @BeforeMethod
 @Override
 public void setup() throws Exception {
+isTcpLookup = true;
 super.internalSetup();
 
 // Setup namespaces
@@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 .allowedClusters(Collections.singleton(CLUSTER_NAME))
 .build();
 admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
 }
 
 @AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
 
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
 }
 
+@Test
+public void testGetSchemaWithPatternTopic() throws Exception {
+final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+int topicNums = 10;
+for (int i = 0; i < topicNums; i++) {
+String topic = topicPrefix + "-" + i;
+admin.topics().createNonPartitionedTopic(topic);

Re: [PR] [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request [pulsar]

2024-03-28 Thread via GitHub


lhotari merged PR #22377:
URL: https://github.com/apache/pulsar/pull/22377


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch master updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d8903da3d5e [fix][broker] Fix issue of field 'topic' is not set when 
handle GetSchema request (#22377)
d8903da3d5e is described below

commit d8903da3d5ea5bab207d119186f2be6fa1147f60
Author: Cong Zhao 
AuthorDate: Thu Mar 28 23:14:19 2024 +0800

[fix][broker] Fix issue of field 'topic' is not set when handle GetSchema 
request (#22377)
---
 .../apache/pulsar/broker/service/ServerCnx.java|  5 ++--
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 33 ++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4f82f416ed2..716f3a1a04c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2504,9 +2504,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaVersion = 
schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
 }
 
+final String topic = commandGetSchema.getTopic();
 String schemaName;
 try {
-schemaName = 
TopicName.get(commandGetSchema.getTopic()).getSchemaName();
+schemaName = TopicName.get(topic).getSchemaName();
 } catch (Throwable t) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.InvalidTopicName, t.getMessage());
 return;
@@ -2515,7 +2516,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 schemaService.getSchema(schemaName, 
schemaVersion).thenAccept(schemaAndMetadata -> {
 if (schemaAndMetadata == null) {
 commandSender.sendGetSchemaErrorResponse(requestId, 
ServerError.TopicNotFound,
-String.format("Topic not found or no-schema %s", 
commandGetSchema.getTopic()));
+String.format("Topic not found or no-schema %s", 
topic));
 } else {
 commandSender.sendGetSchemaResponse(requestId,
 SchemaInfoUtil.newSchemaInfo(schemaName, 
schemaAndMetadata.schema), schemaAndMetadata.version);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d4ef041f6de..aa47c378fc3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
 import lombok.EqualsAndHashCode;
@@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
 @BeforeMethod
 @Override
 public void setup() throws Exception {
+isTcpLookup = true;
 super.internalSetup();
 
 // Setup namespaces
@@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest 
{
 .allowedClusters(Collections.singleton(CLUSTER_NAME))
 .build();
 admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns");
 }
 
 @AfterMethod(alwaysRun = true)
@@ -130,6 +135,34 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
 
pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
 }
 
+@Test
+public void testGetSchemaWithPatternTopic() throws Exception {
+final String topicPrefix = "persistent://public/my-ns/test-getSchema";
+
+int topicNums = 10;
+for (int i = 0; i < topicNums; i++) {
+String topic = topicPrefix + "-" + i;
+admin.topics().createNonPartitionedTopic(topic);
+}
+
+Pattern pattern = Pattern.compile(topicPrefix + "-.*");
+   

Re: [PR] [improve][misc] Upgrade log4j2 to 2.23.1 [pulsar]

2024-03-28 Thread via GitHub


nodece commented on PR #22327:
URL: https://github.com/apache/pulsar/pull/22327#issuecomment-2025412874

   I tried to use the following config to generate the log4j2 plugin config, 
but not working.
   ```xml
   
 org.apache.maven.plugins
 maven-shade-plugin
 ${maven-shade-plugin}
 
   
 org.apache.logging.log4j
 
log4j-transform-maven-shade-plugin-extensions
 0.1.0
   
 
 
 
   shade-jar-with-dependencies
   package
   
 shade
   
   
 
   
   
   
 
   true
 
   
 
   
 
 
   
   ```
   Or
   ```

   org.apache.maven.plugins
   maven-compiler-plugin
   
 UTF-8
 true
 true
 
 false
 
   
 org.projectlombok
 lombok
 ${lombok.version}
   
   
 org.apache.logging.log4j
 log4j-core
 ${log4j2.version}
   
 
 
   -parameters
   
   -Xlint:all
   -Xlint:-options
   -Xlint:-serial
   -Xlint:-classfile
   -Xlint:-processing
   -Xpkginfo:always
 
   
 
   ```
   If you have a good idea, please let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][io] use create for bulk op if target cluster is ElasticSearch [pulsar]

2024-03-28 Thread via GitHub


shibd commented on code in PR #21377:
URL: https://github.com/apache/pulsar/pull/21377#discussion_r1543059139


##
pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java:
##
@@ -340,6 +348,11 @@ public enum IdHashingAlgorithm {
 SHA512
 }
 
+public enum IndexType {
+INDEX,
+DATA_STREAM

Review Comment:
   Where is it used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]

2024-03-28 Thread via GitHub


asafm commented on code in PR #22370:
URL: https://github.com/apache/pulsar/pull/22370#discussion_r1542921618


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java:
##
@@ -294,8 +291,10 @@ public void start() throws PulsarServerException {
 ContextHandlerCollection contexts = new ContextHandlerCollection();
 contexts.setHandlers(handlers.toArray(new 
Handler[handlers.size()]));
 
+Handler handlerForContexts = 
GzipHandlerUtil.wrapWithGzipHandler(contexts,

Review Comment:
   Honestly, I find the entire handlers registration in `start()` super 
confusing.
   
   Maybe we can just do it in a simpler manner first like:
   ```
   import org.eclipse.jetty.server.Handler;
   import org.eclipse.jetty.server.Server;
   import org.eclipse.jetty.server.handler.DefaultHandler;
   import org.eclipse.jetty.server.handler.HandlerCollection;
   import org.eclipse.jetty.server.handler.RequestLogHandler;
   import org.eclipse.jetty.server.handler.StatisticsHandler;
   import org.eclipse.jetty.util.log.Log;
   import org.eclipse.jetty.util.log.Logger;
   
   public class JettyServerConfiguration {
   
   private static final Logger LOG = 
Log.getLogger(JettyServerConfiguration.class);
   
   public static void main(String[] args) {
   Server server = new Server(8080); // Example port
   
   // Request Logging
   RequestLogHandler requestLogHandler = new RequestLogHandler();
   
requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger());
   
   // Metrics 
   StatisticsHandler stats = new StatisticsHandler();
   
   // Handler Ordering
   HandlerCollection handlers = new HandlerCollection();
   handlers.setHandlers(new Handler[] { requestLogHandler, stats, new 
DefaultHandler() });  
   
   // Metrics Registration (with handling)
   try {
   JettyStatisticsCollector jettyStatisticsCollector = new 
JettyStatisticsCollector(stats);
   jettyStatisticsCollector.register();
   } catch (IllegalArgumentException e) {
   LOG.warn("JettyStatisticsCollector likely already registered", 
e);  
   }
   
   server.setHandler(handlers);
   
   try {
   server.start();
   server.join();
   } catch (Exception e) {
   LOG.error("Failed to start server", e);
   }
   }
   }
   
   ```
   
   then maybe we have the handler array be: stats, gzip, log, default
   
   



##
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java:
##
@@ -327,4 +327,11 @@ PulsarAdminBuilder authentication(String 
authPluginClassName, Maphttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.util.List;
+import org.eclipse.jetty.server.Handler;
+import org.eclipse.jetty.server.handler.gzip.GzipHandler;
+
+public class GzipHandlerUtil {
+public static Handler wrapWithGzipHandler(Handler innerHandler, 
List gzipCompressionExcludedPaths) {
+Handler wrappedHandler;
+if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) 
{

Review Comment:
   Just thinking out loud - can't we just define gzip on the handlers we see 
fit? We know Pulsar best no? Too much configuration option can be confusing. 
They can control the gzip by using headers, no?



##
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java:
##
@@ -54,13 +55,23 @@ private PulsarAdminBuilderImpl(ClientConfigurationData 
conf) {
 
 @Override
 public PulsarAdminBuilder clone() {
-return new PulsarAdminBuilderImpl(conf.clone());
+PulsarAdminBuilderImpl pulsarAdminBuilder = new 
PulsarAdminBuilderImpl(conf.clone());
+pulsarAdminBuilder.clientBuilderClassLoader = clientBuilderClassLoader;
+pulsarAdminBuilder.gzipCompressionEnabled = gzipCompressionEnabled;
+return pulsarAdminBuilder;
 }
 
 @Override
 public PulsarAdminBuilder loadConf(Map config) {
 conf = ConfigurationDataUtils.loadData(config, conf, 
ClientConfigurationData.class);
 setAuthenticationFromPropsIfAvailable(conf);
+if (config.containsKey("gzipCompressionEnabled")) {
+if (config.get("gzipCompressionEnabled") instanceof Boolean) {

Review Comment:
   I would personally extract it once `config.get("gzipCompressionEnabled")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on 

(pulsar-site) branch main updated: Update release process with use of variables to ease copy-pasting

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
 new a1f23abf632b Update release process with use of variables to ease 
copy-pasting
a1f23abf632b is described below

commit a1f23abf632b911a5c549d7185f6dcc1a1e92dc8
Author: Lari Hotari 
AuthorDate: Thu Mar 28 14:48:29 2024 +0200

Update release process with use of variables to ease copy-pasting
---
 contribute/create-gpg-keys.md |  10 
 contribute/release-process.md | 112 ++
 2 files changed, 80 insertions(+), 42 deletions(-)

diff --git a/contribute/create-gpg-keys.md b/contribute/create-gpg-keys.md
index 03e514e6942a..377139afec80 100644
--- a/contribute/create-gpg-keys.md
+++ b/contribute/create-gpg-keys.md
@@ -118,6 +118,16 @@ gpg --send-key --keyserver=keys.openpgp.org $KEY_ID
 gpg --send-key --keyserver=keyserver.ubuntu.com $KEY_ID
 ```
 
+## Make your the Apache key the default key for GPG
+
+This is required for signing the release artifacts
+
+```shell
+APACHEID=your_asf_id
+KEY_ID=$(gpg --list-keys --with-colons $apach...@apache.org | egrep "^pub" | 
awk -F: '{print $5}')
+echo "default-key $KEY_ID" >> ~/.gnupg/gpg.conf
+```
+
 ## Appending the key to KEYS files
 
 The GPG key needs to be appended to `KEYS` file for the release candidates.
diff --git a/contribute/release-process.md b/contribute/release-process.md
index 4780e77a664d..cff4d7704c6f 100644
--- a/contribute/release-process.md
+++ b/contribute/release-process.md
@@ -40,6 +40,29 @@ Before you start the next release steps, make sure you have 
installed these soft
 
 Also, you need to **clean up the bookkeeper's local compiled** to make sure 
the bookkeeper dependency is fetched from the Maven repository, details to see 
[this mailing list 
thread](https://lists.apache.org/thread/gsbh95b2d9xtcg5fmtxpm9k9q6w68gd2).
 
+
+## Set environment variables to be used across the commands
+
+Set version
+```shell
+export VERSION_RC=3.0.4-candidate-1
+export VERSION_WITHOUT_RC=${VERSION_RC%-candidate-*}
+export VERSION_BRANCH=branch-3.0
+export UPSTREAM_REMOTE=origin
+```
+
+Set your ASF user id
+```shell
+export APACHE_USER=
+```
+
+In addition, you will need to set `PULSAR_PATH` to point to the cleanly 
checked out working directory for the release branch.
+
+If you run into problems with GPG signing set this
+```
+export GPG_TTY=$(tty)
+```
+
 ## Create a release candidate
 
 ### Create the release branch
@@ -58,14 +81,21 @@ It is recommended to create a fresh clone of the repository 
to avoid any local f
 ```shell
 git clone g...@github.com:apache/pulsar.git
 cd pulsar
-PULSAR_PATH=$(pwd)
-git checkout -b branch-2.X origin/master
+export PULSAR_PATH=$(pwd)
+git checkout -b $VERSION_BRANCH origin/master
 ```
 
 Alternatively, you can use a git workspace to create a new, clean directory on 
your machine without needing to re-download the project.
 
 ```shell
-git worktree add ../pulsar.branch-2.X branch-2.X
+git worktree add ../pulsar-release-$VERSION_BRANCH $VERSION_BRANCH
+cd pulsar-release-$VERSION_BRANCH
+export PULSAR_PATH=$(pwd)
+```
+
+if you get an error that the branch is already checked out, go to that 
directory detach it from the branch. After this the above command should succeed
+```shell 
+git checkout --detach HEAD
 ```
 
 If you created a new branch, update the [CI - OWASP Dependency 
Check](https://github.com/apache/pulsar/blob/master/.github/workflows/ci-owasp-dependency-check.yaml)
 workflow so that it will run on the new branch.
@@ -126,6 +156,8 @@ It will speed up cherry-picking since you commit ids are 
there and there's also
 A cherry-pick should be done in this order with `git cherry-pick -x COMMIT_ID`.
 It's possible that some dependent commits are necessary to be cherry-picked 
when you encounter a lot of merge conflicts in a case where they aren't 
expected.
 
+
+
 ### Update project version and tag
 
 During the release process, you are going to initially create "candidate" 
tags, that after verification and approval will get promoted to the "real" 
final tag.
@@ -135,26 +167,22 @@ In this process, the maven version of the project will 
always be the final one.
 ```shell
 # Bump to the release version
 cd $PULSAR_PATH
-./src/set-project-version.sh 2.X.0
+./src/set-project-version.sh $VERSION_WITHOUT_RC
 
 # Some version may not update the right parent version of 
`protobuf-shaded/pom.xml`, please double check it.
 
 # Commit
-git commit -m 'Release 2.X.0' -a
+git commit -m "Release $VERSION_WITHOUT_RC" -a
 
 # Create a "candidate" tag
-# If you don't sign your commits already, use the following
-export GPG_TTY=$(tty)
-git tag -u $u...@apache.org v2.X.0-candidate-1 -m 'Release v2.X.0-candidate-1'
-# If you already sign your commits using your apache.org email, use the 
following
-git tag -s v2.X.0-candidate-1 -m 'Release 

svn commit: r68185 - /dev/pulsar/pulsar-3.0.4-candidate-1/

2024-03-28 Thread lhotari
Author: lhotari
Date: Thu Mar 28 13:06:31 2024
New Revision: 68185

Log:
Add directory for pulsar 3.0.4-candidate-1 release

Added:
dev/pulsar/pulsar-3.0.4-candidate-1/



(pulsar) annotated tag v3.0.4-candidate-1 updated (2e13fba3ee6 -> fac66a82c4f)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to annotated tag v3.0.4-candidate-1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


*** WARNING: tag v3.0.4-candidate-1 was modified! ***

from 2e13fba3ee6 (commit)
  to fac66a82c4f (tag)
 tagging 2e13fba3ee6b4eba5de4d36b0f00200184bd1145 (commit)
 replaces v3.0.3
  by Lari Hotari
  on Thu Mar 28 14:56:47 2024 +0200

- Log -
Release 3.0.4-candidate-1
-BEGIN PGP SIGNATURE-

iQJHBAABCgAxFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFaQ8THGxob3RhcmlA
YXBhY2hlLm9yZwAKCRDT+mfVIsVSVgsxD/0Y9YorBhJ2DGhZlx9+ywRA/nBkfI5b
IXg9PqaqQbGfCuvjl1/JBUuKahUH29R9y06ZPImNMLDOZRYprYqSCnelGUyX+MOu
B+sDdsKUO1fY4Fz+DBzJfEhskk3YhStx4zHwTcm6epz7RlI5m1iNGv3TsJ8RbQus
NgmtlQVjH1xicJLLl9c7ukUx8NiZXysAMggAELvsGunNMvBXtqyAdtr7BqzGB9tT
p2/cVhAA1P4aizmWJ7FKrTa1H3mtEsYhtcovqTngXAjPbexjp2preogsQpRAN0Xl
0/9Yrr/4xWMcfSKNCACG56XQB5ZBvAXw86Nf5DavdXrI678cBGiX+lU7rlFgMLQk
hlyvJR4MB/+yzYF/EF1l+jN0trZTZQS4pbld9+rKuiD8SDZF3kjA94f6qgAxtFnO
XY+wjgYxUPYkZHBaAKpKvS9g5uFWltkitleVdI7+hcZT6CCWew4JyZnCMANaR4at
vckzG7KVJOvm617Vd+3YjHftN8L8fNP+xkBMdb0wC/YXqmUcGCCROJRRPUhK0zhD
/MmmMrVdHuwDkOis0bhe+p8rK3SHX8jsCLmkv0u3OcLxfp9xbS62hXNlYUjblza2
0oN3AHAlMFHQotgAZO40q+An1pP9X5fCpKTpeIMPxm5YbpPWqKZDzC1u/6pdHskC
XGz86Ucjq2NfmA==
=D0Ou
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



(pulsar) branch branch-3.0 updated (8f174463550 -> 2e13fba3ee6)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 8f174463550 [improve][test][branch-3.0] Improve 
ManagedLedgerTest.testGetNumberOfEntriesInStorage
 add 2e13fba3ee6 Release 3.0.4

No new revisions were added by this update.

Summary of changes:
 bouncy-castle/bc/pom.xml  | 2 +-
 bouncy-castle/bcfips-include-test/pom.xml | 2 +-
 bouncy-castle/bcfips/pom.xml  | 2 +-
 bouncy-castle/pom.xml | 2 +-
 buildtools/pom.xml| 4 ++--
 distribution/io/pom.xml   | 2 +-
 distribution/offloaders/pom.xml   | 2 +-
 distribution/pom.xml  | 2 +-
 distribution/server/pom.xml   | 2 +-
 distribution/shell/pom.xml| 2 +-
 docker/pom.xml| 2 +-
 docker/pulsar-all/pom.xml | 2 +-
 docker/pulsar/pom.xml | 2 +-
 jclouds-shaded/pom.xml| 2 +-
 managed-ledger/pom.xml| 2 +-
 pom.xml   | 4 ++--
 pulsar-broker-auth-athenz/pom.xml | 2 +-
 pulsar-broker-auth-oidc/pom.xml   | 2 +-
 pulsar-broker-auth-sasl/pom.xml   | 2 +-
 pulsar-broker-common/pom.xml  | 2 +-
 pulsar-broker/pom.xml | 2 +-
 pulsar-client-1x-base/pom.xml | 2 +-
 pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +-
 pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +-
 pulsar-client-admin-api/pom.xml   | 2 +-
 pulsar-client-admin-shaded/pom.xml| 2 +-
 pulsar-client-admin/pom.xml   | 2 +-
 pulsar-client-all/pom.xml | 2 +-
 pulsar-client-api/pom.xml | 2 +-
 pulsar-client-auth-athenz/pom.xml | 2 +-
 pulsar-client-auth-sasl/pom.xml   | 2 +-
 pulsar-client-messagecrypto-bc/pom.xml| 2 +-
 pulsar-client-shaded/pom.xml  | 2 +-
 pulsar-client-tools-api/pom.xml   | 2 +-
 pulsar-client-tools-customcommand-example/pom.xml | 2 +-
 pulsar-client-tools-test/pom.xml  | 2 +-
 pulsar-client-tools/pom.xml   | 2 +-
 pulsar-client/pom.xml | 2 +-
 pulsar-common/pom.xml | 2 +-
 pulsar-config-validation/pom.xml  | 2 +-
 pulsar-functions/api-java/pom.xml | 2 +-
 pulsar-functions/instance/pom.xml | 2 +-
 pulsar-functions/java-examples-builtin/pom.xml| 2 +-
 pulsar-functions/java-examples/pom.xml| 2 +-
 pulsar-functions/localrun-shaded/pom.xml  | 2 +-
 pulsar-functions/localrun/pom.xml | 2 +-
 pulsar-functions/pom.xml  | 2 +-
 pulsar-functions/proto/pom.xml| 2 +-
 pulsar-functions/runtime-all/pom.xml  | 2 +-
 pulsar-functions/runtime/pom.xml  | 2 +-
 pulsar-functions/secrets/pom.xml  | 2 +-
 pulsar-functions/utils/pom.xml| 2 +-
 pulsar-functions/worker/pom.xml   | 2 +-
 pulsar-io/aerospike/pom.xml   | 2 +-
 pulsar-io/alluxio/pom.xml | 2 +-
 pulsar-io/aws/pom.xml | 2 +-
 pulsar-io/batch-data-generator/pom.xml| 2 +-
 pulsar-io/batch-discovery-triggerers/pom.xml  | 2 +-
 pulsar-io/canal/pom.xml   | 2 +-
 pulsar-io/cassandra/pom.xml   | 2 +-
 pulsar-io/common/pom.xml  | 2 +-
 pulsar-io/core/pom.xml| 2 +-
 pulsar-io/data-generator/pom.xml  | 2 +-
 pulsar-io/debezium/core/pom.xml   | 2 +-
 pulsar-io/debezium/mongodb/pom.xml| 2 +-
 pulsar-io/debezium/mssql/pom.xml  | 2 +-
 pulsar-io/debezium/mysql/pom.xml  | 2 +-
 pulsar-io/debezium/oracle/pom.xml | 2 +-
 pulsar-io/debezium/pom.xml| 2 +-
 pulsar-io/debezium/postgres/pom.xml   | 2 +-
 pulsar-io/docs/pom.xml| 2 +-
 pulsar-io/dynamodb/pom.xml| 2 +-
 pulsar-io/elastic-search/pom.xml  | 2 +-
 pulsar-io/file/pom.xml| 2 +-
 

Re: [PR] [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request [pulsar]

2024-03-28 Thread via GitHub


Technoboy- closed pull request #22377: [fix][broker] Fix issue of field 'topic' 
is not set when handle GetSchema request
URL: https://github.com/apache/pulsar/pull/22377


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [I] [Bug] Multi-role authorization does not work properly on granting topic level permissions based on ordering of roles in token [pulsar]

2024-03-28 Thread via GitHub


Technoboy- commented on issue #22343:
URL: https://github.com/apache/pulsar/issues/22343#issuecomment-2024950672

   @dhsy6z  Could you help confirm if the below test match your reproduced 
steps? I can't reproduce it.
   you can put this method in 
org.apache.pulsar.client.api.MultiRolesTokenAuthorizationProviderTest.
   and then run it with green bar
   ```
   @Test
   public void testMultiRole() throws Exception {
   String tenant = "tenant1";
   @Cleanup
   PulsarAdmin admin = newPulsarAdmin(superUserToken);
   admin.tenants().createTenant(tenant, TenantInfo.builder()
   .adminRoles(Sets.newHashSet("Group_Test-user"))
   
.allowedClusters(Sets.newHashSet(configClusterName)).build());
   String namespace = tenant + "/namespace1";
   admin.namespaces().createNamespace(namespace);
   String topic = namespace + "/" + "test-topic";
   admin.topics().createNonPartitionedTopicAsync(topic);
   
   //
   Map claims = new HashMap<>();
   Set roles = new HashSet<>();
   roles.add("Group_Test-admin");
   roles.add("Group_Test-user");
   claims.put("roles", roles);
   final String token = Jwts.builder()
   .setClaims(claims)
   .signWith(secretKey)
   .compact();
   @Cleanup
   PulsarAdmin adminTest = newPulsarAdmin(token);
   adminTest.namespaces().getTopics(namespace);
   admin.topics().grantPermission(topic, "Group_Test-user", 
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [fix][bug] Fix issue of Field 'topic' is not set when handle GetSchema request [pulsar]

2024-03-28 Thread via GitHub


coderzc opened a new pull request, #22377:
URL: https://github.com/apache/pulsar/pull/22377

   ### Motivation
   
   When the consumer uses AUTO_CONSUME to subscribe to PatternTopic, we get the 
`Field 'topic' is not set` error. The root cause is that we incorrectly used 
the `commandGetSchema` variable, which is a shared variable in the thread, but 
we're passing it on to another thread, this is unsafe since next request will 
reset this command data, this case only occurs when the topic does not exist or 
the scheme does not exist when request `GetSchema`. This issue Introduced from 
https://github.com/apache/pulsar/pull/20932
   
   ```java
   java.util.concurrent.CompletionException: 
org.apache.pulsar.client.api.PulsarClientException: 
{"errorMsg":"java.lang.IllegalStateException: Field 'topic' is not 
set","reqId":3644652631296588088, "remote":"localhost/127.0.0.1:62361", 
"local":"/127.0.0.1:62369"}
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) 
~[?:?]
at 
org.apache.pulsar.client.impl.ClientCnx.handleGetSchemaResponse(ClientCnx.java:970)
 ~[classes/:?]
at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:361)
 ~[classes/:?]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 ~[netty-codec-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
 ~[netty-codec-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
 ~[netty-handler-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) 
~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
 ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) 
~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) 
~[netty-transport-4.1.105.Final.jar:4.1.105.Final]

Re: [I] [Doc] Add a proper way to submit website issues and improvement proposals [pulsar]

2024-03-28 Thread via GitHub


asafm commented on issue #22277:
URL: https://github.com/apache/pulsar/issues/22277#issuecomment-2024937889

   @lhotari You are correct. I just wanted the process for website issues to be 
aligned with the process in general in Pulsar for triaging issues of any sort.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve][misc] Upgrade log4j2 to 2.23.1 [pulsar]

2024-03-28 Thread via GitHub


asafm commented on PR #22327:
URL: https://github.com/apache/pulsar/pull/22327#issuecomment-2024934741

   Ok guys, there was a lot of information thrown around here, and for me most 
of it was new.
   I'll try to summarize it all here:
   
   1. Log4j2 supports plugins. allowing you to extend Log4j. In our example, it 
allows anyone to add a new type of Appender.
   2. Prometheus Simple client has a dependency which we included in Pulsar:
   ```
  
 io.prometheus
 simpleclient_log4j2
   
   ```
  The dependency includes a Log4j2 plugin, which adds a new `Prometheus` 
type Appender. As @lhotari noted it is:
   ```
   @Plugin(name = "Prometheus", category = "Core", elementType = "appender")
   public final class InstrumentedAppender extends AbstractAppender {
   ```
   
  This Appender receives the logs (log events) and exposes metrics about 
them - how many logs per type, etc. In effect, it instruments Log4j2.
   
   3. The `Prometheus` type appender is used as you noted in the log4j2.yaml 
under Appenders
   ```
   Prometheus:
 name: Prometheus
   ```
   and then referenced to "forward" the logs to that appender so they can be 
counted.
   ```
   # Default root logger configuration
   Root:
 level: "${sys:pulsar.log.root.level}"
 additivity: true
 AppenderRef:
   - ref: "${sys:pulsar.log.appender}"
 level: "${sys:pulsar.log.level}"
   - ref: Prometheus
 level: info
   ```
   
   4. Log4j previously allowed to configure which packages to scan for 
existence of the classes you need to extend if you wish to add a plugin. As you 
@nodece noted in the link you provided, you don't really need it. You just need 
the plugin to have the `@Plugin` annotation, which it does. This mean we can 
safely remove the `packages` line from log4j2.yaml.
   
   You do need to verify the Prometheus log4j metrics are still exposed as 
double check.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]

2024-03-28 Thread via GitHub


sherlock-lin commented on PR #22359:
URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024915435

   > > This is my first pr, if the merge in can give me more confidence, I'll 
try to avoid mentioning low value pr taking up resources until the technical 
debt clears up.
   > 
   > @sherlock-lin Congrats for the first PR and welcome to join the Pulsar 
contributors! For the purpose of getting familiar to how PRs are merged and how 
the Pulsar CI works, this PR was a great way to learn that.
   > 
   > You might notice in future PRs that we have quite a few flaky tests in 
Pulsar CI and usually there will be multiple flakes and the build will have to 
be retried multiple times. For this PR, you hadn't yet tried out the ["Personal 
CI"](https://pulsar.apache.org/contribute/personal-ci/) option for running PR 
builds in your own fork to get build feedback. That's useful to learn since you 
can retry failing builds as you like in your own fork and prepare the PR for 
final review. The guide is missing some details since you will have to enable 
GitHub Actions in your Pulsar fork. Contributions to the instructions are also 
welcome if you notice any gaps.
   > 
   > I'm sorry about the criticism about this PR. I got stuck on the detail 
that this PR isn't a bug fix, but a code cleanup PR and ended up explaining 
more details why I'm not eager to encourage to do more code cleanup PRs before 
we have changed the Pulsar maintenance strategy ([#22359 
(comment)](https://github.com/apache/pulsar/pull/22359#issuecomment-2022311328)).
   > 
   > We have plenty of valuable contribution opportunies in the Apache Pulsar 
project. Please join [the Apache Pulsar 
Slack](https://pulsar.apache.org/community/#section-discussions) and it's #dev 
channel. When you are looking for contribution opportunies, I'll be happy to 
suggest valuable opportunies for you and help with any barriers for 
contributions.
   > 
   > /cc @Technoboy- @codelipenghui
   
   Thanks @lhotari  for your patient guidance!  in this PR submission I have 
irregularities thanks for pointing out. 
   
   in this process I have learned a lot about contributing code knowledge, I 
will learn more about submitting pr specification knowledge to improve 
efficiency!
   
   Also thanks to @Technoboy- @codecov-commenter @liangyuanpeng  and other 
partners for their guidance and help. I'm honored to be able to join Pulsar 
contributors with everybody to make Pulsar better!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.2 updated: [improve][test][branch-3.2] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 6255b1282e8 [improve][test][branch-3.2] Improve 
ManagedLedgerTest.testGetNumberOfEntriesInStorage
6255b1282e8 is described below

commit 6255b1282e8488ca3fc5fddcf320480af9b20255
Author: Lari Hotari 
AuthorDate: Thu Mar 28 12:36:44 2024 +0200

[improve][test][branch-3.2] Improve 
ManagedLedgerTest.testGetNumberOfEntriesInStorage

- improve the test case based on suggestion from @poorbarcode
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 6782867eb50..0d9d6c0e573 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2641,10 +2641,10 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 Awaitility.await().untilAsserted(() -> {
assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
 });
-managedLedger.rollCurrentLedgerIfFull();
+managedLedger.createLedgerAfterClosed();
 Awaitility.await().untilAsserted(() -> {
-assertEquals(managedLedger.getLedgersInfo().size(), 2);
-assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
+assertEquals(managedLedger.getLedgersInfo().size(), 3);
+assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.LedgerOpened);
 });
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(0).getEntries());
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(1).getEntries());



(pulsar) branch branch-3.0 updated: [improve][test][branch-3.0] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 8f174463550 [improve][test][branch-3.0] Improve 
ManagedLedgerTest.testGetNumberOfEntriesInStorage
8f174463550 is described below

commit 8f17446355095d9d24d781446bd9d60ca4a275ef
Author: Lari Hotari 
AuthorDate: Thu Mar 28 12:36:44 2024 +0200

[improve][test][branch-3.0] Improve 
ManagedLedgerTest.testGetNumberOfEntriesInStorage

- improve the test case based on suggestion from @poorbarcode
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d20bbf0d7f5..6989d5edf6c 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2640,10 +2640,10 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 Awaitility.await().untilAsserted(() -> {
assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
 });
-managedLedger.rollCurrentLedgerIfFull();
+managedLedger.createLedgerAfterClosed();
 Awaitility.await().untilAsserted(() -> {
-assertEquals(managedLedger.getLedgersInfo().size(), 2);
-assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
+assertEquals(managedLedger.getLedgersInfo().size(), 3);
+assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.LedgerOpened);
 });
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(0).getEntries());
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(1).getEntries());



[PR] [fix][io] use create for bulk op if target cluster is ElasticSearch [pulsar]

2024-03-28 Thread via GitHub


465 opened a new pull request, #21377:
URL: https://github.com/apache/pulsar/pull/21377

   ### Motivation
   
   (recreated PR, original: https://github.com/apache/pulsar/pull/20870)
   
   on ElasticSearch, the bulk API must be called with `create` operation if the 
target index is data stream.
   the current plugin cannot insert documents to elasticsearch data stream if 
the bulk is enabled, throwing `illegal_argument_exception`
   
   this PR address this issue.
   
   ### Modifications
   
   - adds `INDEX_TYPE` config
   - use `CREATE` op if `INDEX_TYPE` is `DATA_STREAM`
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   
   
   - [ ] `doc` 
   - [x] `doc-required` 
   - [ ] `doc-not-needed` 
   - [ ] `doc-complete` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][io] use create for bulk op if target cluster is ElasticSearch [pulsar]

2024-03-28 Thread via GitHub


shibd closed pull request #21377: [fix][io] use create for bulk op if target 
cluster is ElasticSearch
URL: https://github.com/apache/pulsar/pull/21377


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) 02/02: [improve][misc] Pin Netty version in pulsar-io/alluxio (#21728)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3f0a4b3280142fe990280eff0f2c1c00379ba38
Author: Lari Hotari 
AuthorDate: Fri Dec 15 07:36:24 2023 +0200

[improve][misc] Pin Netty version in pulsar-io/alluxio (#21728)

(cherry picked from commit b777136e57151c322416468f0be65841edc8be8a)
---
 pulsar-io/alluxio/pom.xml | 49 ---
 1 file changed, 25 insertions(+), 24 deletions(-)

diff --git a/pulsar-io/alluxio/pom.xml b/pulsar-io/alluxio/pom.xml
index 76fbcc7a5af..6a73cca523c 100644
--- a/pulsar-io/alluxio/pom.xml
+++ b/pulsar-io/alluxio/pom.xml
@@ -32,6 +32,7 @@
 2.9.3
 4.1.11
 1.37.0
+4.1.100.Final
 
 
 pulsar-io-alluxio
@@ -56,12 +57,6 @@
 org.alluxio
 alluxio-core-client-fs
 ${alluxio.version}
-
-
-grpc-netty
-io.grpc
-
-
 
 
 
@@ -74,10 +69,6 @@
 org.glassfish
 javax.el
 
-
-grpc-netty
-io.grpc
-
 
 
 
@@ -90,22 +81,32 @@
 com.google.guava
 guava
 
-
-
-
-io.grpc
-grpc-netty
-${grpc.version}
-
-
-
-io.dropwizard.metrics
-metrics-jvm
-${metrics.version}
-
-
 
 
+
+
+
+io.netty
+netty-bom
+${netty.version}
+pom
+import
+
+
+io.grpc
+grpc-bom
+${grpc.version}
+pom
+import
+
+
+io.dropwizard.metrics
+metrics-jvm
+${metrics.version}
+
+
+
+
 
 
 



(pulsar) 01/02: [fix][build] Upgrade alluxio version to 2.9.3 to fix CVE-2023-38889 (#21715)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 31f0ae44b7f34eceb4f06b42601f8eaba3f0c16e
Author: Jiwei Guo 
AuthorDate: Thu Dec 14 15:24:33 2023 +0800

[fix][build] Upgrade alluxio version to 2.9.3 to fix CVE-2023-38889 (#21715)

(cherry picked from commit 33313c02089e97ee6989bcae4da73cb6434b8fd5)
---
 pulsar-io/alluxio/pom.xml  | 2 +-
 .../main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java   | 7 ---
 .../java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java| 6 +++---
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/pulsar-io/alluxio/pom.xml b/pulsar-io/alluxio/pom.xml
index d08e38b5e0f..76fbcc7a5af 100644
--- a/pulsar-io/alluxio/pom.xml
+++ b/pulsar-io/alluxio/pom.xml
@@ -29,7 +29,7 @@
 
 
 
-2.7.3
+2.9.3
 4.1.11
 1.37.0
 
diff --git 
a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
 
b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
index 413f05e0e17..3b72dc9666b 100644
--- 
a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
+++ 
b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java
@@ -22,12 +22,13 @@ import alluxio.AlluxioURI;
 import alluxio.client.WriteType;
 import alluxio.client.file.FileOutStream;
 import alluxio.client.file.FileSystem;
+import alluxio.conf.Configuration;
 import alluxio.conf.InstancedConfiguration;
 import alluxio.conf.PropertyKey;
 import alluxio.exception.AlluxioException;
 import alluxio.grpc.CreateFilePOptions;
 import alluxio.grpc.WritePType;
-import alluxio.util.FileSystemOptions;
+import alluxio.util.FileSystemOptionsUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -78,7 +79,7 @@ public class AlluxioSink implements Sink {
 private AlluxioSinkConfig alluxioSinkConfig;
 private AlluxioState alluxioState;
 
-private InstancedConfiguration configuration = 
InstancedConfiguration.defaults();
+private InstancedConfiguration configuration = 
Configuration.modifiableGlobal();
 
 private ObjectMapper objectMapper = new ObjectMapper();
 
@@ -205,7 +206,7 @@ public class AlluxioSink implements Sink {
 
 private void createTmpFile() throws AlluxioException, IOException {
 CreateFilePOptions.Builder optionsBuilder =
-
FileSystemOptions.createFileDefaults(configuration).toBuilder();
+
FileSystemOptionsUtils.createFileDefaults(configuration).toBuilder();
 UUID id = UUID.randomUUID();
 String fileExtension = alluxioSinkConfig.getFileExtension();
 tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" + 
fileExtension;
diff --git 
a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
 
b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
index 9325a2255ab..bf40581aae1 100644
--- 
a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
+++ 
b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java
@@ -22,8 +22,8 @@ import alluxio.AlluxioURI;
 import alluxio.client.WriteType;
 import alluxio.client.file.FileSystem;
 import alluxio.client.file.URIStatus;
+import alluxio.conf.Configuration;
 import alluxio.conf.PropertyKey;
-import alluxio.conf.ServerConfiguration;
 import alluxio.master.LocalAlluxioCluster;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FilenameUtils;
@@ -237,8 +237,8 @@ public class AlluxioSinkTest {
 private LocalAlluxioCluster setupSingleMasterCluster() throws Exception {
 // Setup and start the local alluxio cluster
 LocalAlluxioCluster cluster = new LocalAlluxioCluster();
-cluster.initConfiguration(getTestName(getClass().getSimpleName(), 
LocalAlluxioCluster.DEFAULT_TEST_NAME));
-ServerConfiguration.set(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, 
WriteType.MUST_CACHE);
+cluster.initConfiguration(getTestName(getClass().getSimpleName(), 
"test"));
+Configuration.set(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, 
WriteType.MUST_CACHE);
 cluster.start();
 return cluster;
 }



(pulsar) branch branch-3.0 updated (c74eec7f087 -> d3f0a4b3280)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from c74eec7f087 [fix][test] Fix flaky test 
BrokerServiceAutoSubscriptionCreationTest (#22190)
 new 31f0ae44b7f [fix][build] Upgrade alluxio version to 2.9.3 to fix 
CVE-2023-38889 (#21715)
 new d3f0a4b3280 [improve][misc] Pin Netty version in pulsar-io/alluxio 
(#21728)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pulsar-io/alluxio/pom.xml  | 51 +++---
 .../apache/pulsar/io/alluxio/sink/AlluxioSink.java |  7 +--
 .../pulsar/io/alluxio/sink/AlluxioSinkTest.java|  6 +--
 3 files changed, 33 insertions(+), 31 deletions(-)



(pulsar) branch branch-3.0 updated: [fix][test] Fix flaky test BrokerServiceAutoSubscriptionCreationTest (#22190)

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new c74eec7f087 [fix][test] Fix flaky test 
BrokerServiceAutoSubscriptionCreationTest (#22190)
c74eec7f087 is described below

commit c74eec7f087b90797a0e4ddfc527d92c68363519
Author: Jiwei Guo 
AuthorDate: Mon Mar 4 21:42:22 2024 +0800

[fix][test] Fix flaky test BrokerServiceAutoSubscriptionCreationTest 
(#22190)

(cherry picked from commit 8c7c9788119197ac78bb90dfb030b1883139c026)
---
 .../service/BrokerServiceAutoSubscriptionCreationTest.java  | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
index 3f2e182874e..f1128e389ca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -159,15 +160,19 @@ public class BrokerServiceAutoSubscriptionCreationTest 
extends BrokerTestBase {
 throws PulsarAdminException, PulsarClientException {
 pulsar.getConfiguration().setAllowAutoTopicCreation(false);
 pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
-
admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation", 
"false");
+String allowAutoSubscriptionCreation = "allowAutoSubscriptionCreation";
+
admin.brokers().updateDynamicConfiguration(allowAutoSubscriptionCreation, 
"false");
 String topicString = "persistent://prop/ns-abc/non-partitioned-topic" 
+ UUID.randomUUID();
 String subscriptionName = "non-partitioned-topic-sub";
 admin.topics().createNonPartitionedTopic(topicString);
 Assert.assertThrows(PulsarClientException.class,
 ()-> 
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe());
-
admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation", 
"true");
-
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
-
assertTrue(admin.topics().getSubscriptions(topicString).contains(subscriptionName));
+
admin.brokers().updateDynamicConfiguration(allowAutoSubscriptionCreation, 
"true");
+Awaitility.await().untilAsserted(() -> {
+
Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get(allowAutoSubscriptionCreation),
 "true");
+
pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
assertTrue(admin.topics().getSubscriptions(topicString).contains(subscriptionName));
+});
 }
 
 }



Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


codelipenghui commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1542603523


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {
+PersistentTopic persistentTopic =
+(PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.openCursor(cursorName);
+managedLedger.getCursors().removeCursor(cursor.getName());
+managedLedger.getActiveCursors().removeCursor(cursor.getName());
+ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST);
+AtomicInteger replyReadCounter = new AtomicInteger();
+Mockito.doAnswer(invocation -> {
+if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+replyReadCounter.incrementAndGet();
+}
+return invocation.callRealMethod();
+}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any());
+Mockito.doAnswer(invocation -> {
+if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+replyReadCounter.incrementAndGet();
+}
+return invocation.callRealMethod();
+}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any(), Mockito.anyBoolean());
+admin.topics().createSubscription(topicName, cursorName, 
MessageId.earliest);
+return replyReadCounter;
+}
+
+@Test
+public void testNoRepeatedReadAndDiscard() throws Exception {
+int delayedMessages = 100;
+final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+final String subName = "my-sub";
+admin.topics().createNonPartitionedTopic(topic);
+AtomicInteger replyReadCounter = injectReplayReadCounter(topic, 
subName);
+
+// Send messages.
+@Cleanup
+Producer producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+for (int i = 0; i < delayedMessages; i++) {
+MessageId messageId = producer.newMessage()
+.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+.value(100 + i)
+.send();
+log.info("Published delayed message :{}", messageId);
+}
+producer.close();
+
+// Make ack holes.
+Consumer consumer1 = pulsarClient.newConsumer(Schema.INT32)
+.topic(topic)
+.subscriptionName(subName)
+.receiverQueueSize(10)
+.subscriptionType(SubscriptionType.Key_Shared)
+.subscribe();
+Consumer consumer2 = pulsarClient.newConsumer(Schema.INT32)
+.topic(topic)
+.subscriptionName(subName)
+.receiverQueueSize(10)
+.subscriptionType(SubscriptionType.Key_Shared)
+.subscribe();
+List msgList1 = new ArrayList<>();
+List msgList2 = new ArrayList<>();
+for (int i = 0; i < 10; i++) {
+Message msg1 = consumer1.receive(1, TimeUnit.SECONDS);
+if (msg1 != null) {
+msgList1.add(msg1);
+}
+Message msg2 = consumer2.receive(1, TimeUnit.SECONDS);
+if (msg2 != null) {
+msgList2.add(msg2);
+}
+}
+Consumer redeliverConsumer = null;
+if (!msgList1.isEmpty()) {
+msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg));
+redeliverConsumer = consumer2;
+} else {
+msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg));
+redeliverConsumer = consumer1;
+}

Review Comment:
   Actually, it can't ensure the ack hole is introduced. The second consumer 
might not able to get any messages until the first consumer acked all the 
messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Threads config for Pulsar Consumers [pulsar]

2024-03-28 Thread via GitHub


GitHub user danielnesaraj added a comment to the discussion: Threads config for 
Pulsar Consumers

Thank you for the comprehensive response @lhotari ! This helps.

GitHub link: 
https://github.com/apache/pulsar/discussions/22375#discussioncomment-8938836


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]

2024-03-28 Thread via GitHub


codelipenghui commented on code in PR #22245:
URL: https://github.com/apache/pulsar/pull/22245#discussion_r1542586864


##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {

Review Comment:
   Should it be `injectNormalReadCounter`?



##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java:
##
@@ -437,6 +436,82 @@ protected synchronized NavigableSet 
getMessagesToReplayNow(int max
 }
 }
 
+private int getAvailablePermits(Consumer c) {
+int availablePermits = Math.max(c.getAvailablePermits(), 0);
+if (c.getMaxUnackedMessages() > 0) {
+// Avoid negative number
+int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - 
c.getUnackedMessages(), 0);
+availablePermits = Math.min(availablePermits, 
remainUnAckedMessages);
+}
+return availablePermits;
+}
+
+@Override
+protected NavigableSet 
filterOutEntriesWillBeDiscarded(NavigableSet src) {

Review Comment:
   It's better to reuse the `getRestrictedMaxEntriesForConsumer` method to 
avoid the inconsistent behaviors between filter out before read entries and 
filter out after read entries.



##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private AtomicInteger injectReplayReadCounter(String topicName, String 
cursorName) throws Exception {
+PersistentTopic persistentTopic =
+(PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+ManagedCursorImpl cursor = (ManagedCursorImpl) 
managedLedger.openCursor(cursorName);
+managedLedger.getCursors().removeCursor(cursor.getName());
+managedLedger.getActiveCursors().removeCursor(cursor.getName());
+ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST);
+managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST);
+AtomicInteger replyReadCounter = new AtomicInteger();
+Mockito.doAnswer(invocation -> {
+if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+replyReadCounter.incrementAndGet();
+}
+return invocation.callRealMethod();
+}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any());
+Mockito.doAnswer(invocation -> {
+if 
(!String.valueOf(invocation.getArguments()[2]).equals("Normal")) {
+replyReadCounter.incrementAndGet();
+}
+return invocation.callRealMethod();
+}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), 
Mockito.any(), Mockito.anyBoolean());
+admin.topics().createSubscription(topicName, cursorName, 
MessageId.earliest);
+return replyReadCounter;
+}
+
+@Test
+public void testNoRepeatedReadAndDiscard() throws Exception {
+int delayedMessages = 100;
+final String topic = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+final String subName = "my-sub";
+admin.topics().createNonPartitionedTopic(topic);
+AtomicInteger replyReadCounter = injectReplayReadCounter(topic, 
subName);
+
+// Send messages.
+@Cleanup
+Producer producer = 
pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create();
+for (int i = 0; i < delayedMessages; i++) {
+MessageId messageId = producer.newMessage()
+.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
+.value(100 + i)
+.send();
+log.info("Published delayed message :{}", messageId);

Review Comment:
   Why named delayed messages. It actually published normal messages.



##
pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java:
##
@@ -1630,4 +1637,263 @@ public void 
testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
 log.info("Got {} other messages...", sum);
 Assert.assertEquals(sum, delayedMessages + messages);
 }
+
+private 

(pulsar) branch branch-3.2 updated: [fix][test][branch-3.2] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 9927b55b177 [fix][test][branch-3.2] Fix broken 
ManagedLedgerTest.testGetNumberOfEntriesInStorage
9927b55b177 is described below

commit 9927b55b1778aaa53bcfa77aef74af6dc6d11610
Author: Lari Hotari 
AuthorDate: Thu Mar 28 12:14:23 2024 +0200

[fix][test][branch-3.2] Fix broken 
ManagedLedgerTest.testGetNumberOfEntriesInStorage

- change https://github.com/apache/pulsar/pull/22034 is missing from 
branch-3.2
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 264a2907969..6782867eb50 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2639,12 +2639,12 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
 // trigger ledger rollover and wait for the new ledger created
 Awaitility.await().untilAsserted(() -> {
-   assertEquals("LedgerOpened", 
WhiteboxImpl.getInternalState(managedLedger, "state").toString());
+   assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
 });
 managedLedger.rollCurrentLedgerIfFull();
 Awaitility.await().untilAsserted(() -> {
-assertEquals(managedLedger.getLedgersInfo().size(), 3);
-assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.LedgerOpened);
+assertEquals(managedLedger.getLedgersInfo().size(), 2);
+assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
 });
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(0).getEntries());
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(1).getEntries());



[I] golang adds a batch fetch message API [pulsar-client-go]

2024-03-28 Thread via GitHub


wangrenyi opened a new issue, #1201:
URL: https://github.com/apache/pulsar-client-go/issues/1201


Consumer consumer = 
pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription")

.batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(10).maxNumBytes(1024
 * 1024)
.timeout(200, 
TimeUnit.MILLISECONDS).build())
.subscribe();
   
while (true) {
Messages messages = consumer.batchReceive();
for (Message msg : messages) {
// TODO
}
consumer.acknowledge(messages);
}
   
   
   When will golang provide a batch fetch message, batch submit message API 
with the same functionality as the java client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



(pulsar) branch branch-3.0 updated: [fix][test][branch-3.0] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage

2024-03-28 Thread lhotari
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new e3531e808c1 [fix][test][branch-3.0] Fix broken 
ManagedLedgerTest.testGetNumberOfEntriesInStorage
e3531e808c1 is described below

commit e3531e808c172ff588e36499c41746835d06904a
Author: Lari Hotari 
AuthorDate: Thu Mar 28 12:14:23 2024 +0200

[fix][test][branch-3.0] Fix broken 
ManagedLedgerTest.testGetNumberOfEntriesInStorage

- change https://github.com/apache/pulsar/pull/22034 is missing from 
branch-3.0
---
 .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 10bfb699780..d20bbf0d7f5 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -2638,12 +2638,12 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 
 // trigger ledger rollover and wait for the new ledger created
 Awaitility.await().untilAsserted(() -> {
-   assertEquals("LedgerOpened", 
WhiteboxImpl.getInternalState(managedLedger, "state").toString());
+   assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
 });
 managedLedger.rollCurrentLedgerIfFull();
 Awaitility.await().untilAsserted(() -> {
-assertEquals(managedLedger.getLedgersInfo().size(), 3);
-assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.LedgerOpened);
+assertEquals(managedLedger.getLedgersInfo().size(), 2);
+assertEquals(managedLedger.getState(), 
ManagedLedgerImpl.State.ClosedLedger);
 });
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(0).getEntries());
 assertEquals(5, 
managedLedger.getLedgersInfoAsList().get(1).getEntries());



Re: [I] Enable Pulsar authentication for Client authentication using private_key_jwt method [pulsar]

2024-03-28 Thread via GitHub


lhotari commented on issue #22371:
URL: https://github.com/apache/pulsar/issues/22371#issuecomment-2024829126

   > Hello @lhotari,
   > 
   > sorry, maybe I was not clear enough. What I was talking about was the Java 
client we are using. We could use the option with the JWT you have mentioned, 
but that would mean that every time the JWT expires we would have to update it 
in our systems and production as well. That also normally means a restart of 
the application that uses this JWT token. 
   > 
   > We use a OAuth2 authentication mechanism, but we do not have the 
`client_id` and `client_secret` that is being used in OAuth2 Java client 
implementation. In our case we use the _private_key_jwt_ method that instead of 
the `client_id` and `client_secret`, uses `client_assertion_type` which is 
`urn:ietf:params:oauth:client-assertion-type:jwt-bearer` and `client_assertion` 
that contains information for client authentication. It must be digitally 
signed using a private key.
   
   Thanks @WZHMIJJ, this is a great clarification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [D] Threads config for Pulsar Consumers [pulsar]

2024-03-28 Thread via GitHub


GitHub user lhotari added a comment to the discussion: Threads config for 
Pulsar Consumers

> I am looking into the thread config of the pulsar client. I found this: 
> https://stackoverflow.com/questions/56954771/apache-pulsar-iothreads-listenerthreads-and-message-ordering

Yes, the answer continues to apply to Pulsar client.

> Then there is only 1 thread in which all 3 consumers are going to consume 
> messages from their respective topics, right?
Assuming I have the requisite CPU cores allocated, would I then see performance 
improvement in doing the following:

Yes, this makes sense. It's worth validating your assumption by testing.

Thanks for sharing a good question where you already provided a helpful 
response too.

Regarding performance, adding threads isn't the only consideration. This 
diagram from Wikipedia's HTTP pipelining article illustrates the issue with 
one-by-one handling of messages:
![image](https://github.com/apache/pulsar/assets/66864/dccc7063-82e6-4758-bb0c-1d5b58939b72)

For many use cases, it's possible to significantly reduce the number of 
consumers and increase throughput by using **pipelining**. In Pulsar, this 
requires using the async API. Pulsar Functions support pipelining too when the 
return type is a CompletableFuture, but the feature isn't well documented.

For key-ordered processing with pipelining, the Pulsar Reactive Client provides 
a solution based on Project Reactor's groupBy operator. This is one of the 
sweet spots of the Reactive Client, but there's not much documentation about 
it. 

I made [a conference presentation about the initial ideas in SpringOne 
2021](https://springone.io/2021/sessions/reactive-applications-with-apache-pulsar-and-spring-boot).
 The presentation is slightly outdated since Spring Pulsar has come out after 
that. However, the code examples have been updated to use Spring Pulsar and 
Pulsar Reactive client since then. The code examples for pipelining having been 
updated. 

For pipelining, Pulsar Reactive client has built-in support in the 
ReactiveMessagePipeline. 
([implementation](https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java#L125-L128),
 [single testcase which isn't a good usage 
example](https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java#L321-L329)),
 however, the documentation isn't great for this feature that unlocks 
key-ordered processing with a configurable concurrency/parallelism level. 
The javadoc contains some limited docs: 
https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java#L122-L142
Since Project Reactor has great support for retries, it's very simple to 
implement reliable integration pipelines leveraging these features. I hope I 
could have time to do an updated conference talk with updates to docs and 
examples to explain all of this.






GitHub link: 
https://github.com/apache/pulsar/discussions/22375#discussioncomment-8938633


This is an automatically sent email for commits@pulsar.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org



Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]

2024-03-28 Thread via GitHub


liangyepianzhou commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542581449


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##
@@ -100,6 +104,79 @@ protected void cleanup() {
 super.internalCleanup();
 }
 
+/**
+ * Test consumer can be built successfully with retryable exception
+ * and get correct error with no-retryable exception.
+ * @throws Exception
+ */
+@Test(timeOut = 6)
+public void testBuildConsumerEncounterPendingAckInitFailure() throws 
Exception {
+// 1. Prepare and make sure the consumer can be built successfully.
+String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+@Cleanup
+Consumer consumer1 = pulsarClient.newConsumer()
+.subscriptionName("subName1")
+.topic(topic)
+.subscribe();
+// 2. Mock a transactionPendingAckStoreProvider to test building 
consumer
+// failing at 
transactionPendingAckStoreProvider::checkInitializedBefore.
+Field transactionPendingAckStoreProviderField = PulsarService.class
+.getDeclaredField("transactionPendingAckStoreProvider");
+transactionPendingAckStoreProviderField.setAccessible(true);
+TransactionPendingAckStoreProvider pendingAckStoreProvider =
+(TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+.get(pulsarServiceList.get(0));
+TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+// 3. Test retryable exception when checkInitializedBefore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+// First, the method checkInitializedBefore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail initialize")))
+// Then, the method will be executed successfully.
+.thenReturn(CompletableFuture.completedFuture(false));
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer2 = pulsarClient.newConsumer()
+.subscriptionName("subName2")
+.topic(topic)
+.subscribe();
+
+// 4. Test retryable exception when newPendingAckStore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+.thenReturn(CompletableFuture.completedFuture(true));
+
+when(mockProvider.newPendingAckStore(any()))
+// First, the method newPendingAckStore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail new store")))
+// Then, the method will be executed successfully.
+.thenCallRealMethod();
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer3 = pulsarClient.newConsumer()
+.subscriptionName("subName3")
+.topic(topic)
+.subscribe();
+
+// 5. Test no-retryable exception:
+// The consumer building will be failed without retrying.

Review Comment:
   Good suggestion! Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]

2024-03-28 Thread via GitHub


BewareMyPower commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542552357


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##
@@ -100,6 +104,79 @@ protected void cleanup() {
 super.internalCleanup();
 }
 
+/**
+ * Test consumer can be built successfully with retryable exception
+ * and get correct error with no-retryable exception.
+ * @throws Exception
+ */
+@Test(timeOut = 6)
+public void testBuildConsumerEncounterPendingAckInitFailure() throws 
Exception {
+// 1. Prepare and make sure the consumer can be built successfully.
+String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+@Cleanup
+Consumer consumer1 = pulsarClient.newConsumer()
+.subscriptionName("subName1")
+.topic(topic)
+.subscribe();
+// 2. Mock a transactionPendingAckStoreProvider to test building 
consumer
+// failing at 
transactionPendingAckStoreProvider::checkInitializedBefore.
+Field transactionPendingAckStoreProviderField = PulsarService.class
+.getDeclaredField("transactionPendingAckStoreProvider");
+transactionPendingAckStoreProviderField.setAccessible(true);
+TransactionPendingAckStoreProvider pendingAckStoreProvider =
+(TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+.get(pulsarServiceList.get(0));
+TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+// 3. Test retryable exception when checkInitializedBefore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+// First, the method checkInitializedBefore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail initialize")))
+// Then, the method will be executed successfully.
+.thenReturn(CompletableFuture.completedFuture(false));
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer2 = pulsarClient.newConsumer()
+.subscriptionName("subName2")
+.topic(topic)
+.subscribe();
+
+// 4. Test retryable exception when newPendingAckStore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+.thenReturn(CompletableFuture.completedFuture(true));
+
+when(mockProvider.newPendingAckStore(any()))
+// First, the method newPendingAckStore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail new store")))
+// Then, the method will be executed successfully.
+.thenCallRealMethod();
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer3 = pulsarClient.newConsumer()
+.subscriptionName("subName3")
+.topic(topic)
+.subscribe();
+
+// 5. Test no-retryable exception:
+// The consumer building will be failed without retrying.

Review Comment:
   You can open another PR to add such new error 
(`TransactionComponentLoadFailedException`) as your comment in 
`exceptionHandleFuture` says. It's okay here for now. Your comment "retrying" 
here should only refer the server-side retry.
   
   But since this test could take too long (48 seconds in my local env) to wait 
for the client side, it's better to reduce the timeout by creating `consumer4` 
with another `PulsarClient` that configures the operation timeout.
   
   ```java
   @Cleanup PulsarClient pulsarClient = PulsarClient.builder()
   .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
   .operationTimeout(3, TimeUnit.SECONDS)
   .build();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]

2024-03-28 Thread via GitHub


liangyepianzhou commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542537254


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##
@@ -100,6 +104,79 @@ protected void cleanup() {
 super.internalCleanup();
 }
 
+/**
+ * Test consumer can be built successfully with retryable exception
+ * and get correct error with no-retryable exception.
+ * @throws Exception
+ */
+@Test(timeOut = 6)
+public void testBuildConsumerEncounterPendingAckInitFailure() throws 
Exception {
+// 1. Prepare and make sure the consumer can be built successfully.
+String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+@Cleanup
+Consumer consumer1 = pulsarClient.newConsumer()
+.subscriptionName("subName1")
+.topic(topic)
+.subscribe();
+// 2. Mock a transactionPendingAckStoreProvider to test building 
consumer
+// failing at 
transactionPendingAckStoreProvider::checkInitializedBefore.
+Field transactionPendingAckStoreProviderField = PulsarService.class
+.getDeclaredField("transactionPendingAckStoreProvider");
+transactionPendingAckStoreProviderField.setAccessible(true);
+TransactionPendingAckStoreProvider pendingAckStoreProvider =
+(TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+.get(pulsarServiceList.get(0));
+TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+// 3. Test retryable exception when checkInitializedBefore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+// First, the method checkInitializedBefore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail initialize")))
+// Then, the method will be executed successfully.
+.thenReturn(CompletableFuture.completedFuture(false));
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer2 = pulsarClient.newConsumer()
+.subscriptionName("subName2")
+.topic(topic)
+.subscribe();
+
+// 4. Test retryable exception when newPendingAckStore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+.thenReturn(CompletableFuture.completedFuture(true));
+
+when(mockProvider.newPendingAckStore(any()))
+// First, the method newPendingAckStore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail new store")))
+// Then, the method will be executed successfully.
+.thenCallRealMethod();
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer3 = pulsarClient.newConsumer()
+.subscriptionName("subName3")
+.topic(topic)
+.subscribe();
+
+// 5. Test no-retryable exception:
+// The consumer building will be failed without retrying.

Review Comment:
   Another approach is to add new errors and exceptions to catch this case. 
What do you think about it? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]

2024-03-28 Thread via GitHub


liangyepianzhou commented on code in PR #21274:
URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542534570


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##
@@ -100,6 +104,79 @@ protected void cleanup() {
 super.internalCleanup();
 }
 
+/**
+ * Test consumer can be built successfully with retryable exception
+ * and get correct error with no-retryable exception.
+ * @throws Exception
+ */
+@Test(timeOut = 6)
+public void testBuildConsumerEncounterPendingAckInitFailure() throws 
Exception {
+// 1. Prepare and make sure the consumer can be built successfully.
+String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+@Cleanup
+Consumer consumer1 = pulsarClient.newConsumer()
+.subscriptionName("subName1")
+.topic(topic)
+.subscribe();
+// 2. Mock a transactionPendingAckStoreProvider to test building 
consumer
+// failing at 
transactionPendingAckStoreProvider::checkInitializedBefore.
+Field transactionPendingAckStoreProviderField = PulsarService.class
+.getDeclaredField("transactionPendingAckStoreProvider");
+transactionPendingAckStoreProviderField.setAccessible(true);
+TransactionPendingAckStoreProvider pendingAckStoreProvider =
+(TransactionPendingAckStoreProvider) 
transactionPendingAckStoreProviderField
+.get(pulsarServiceList.get(0));
+TransactionPendingAckStoreProvider mockProvider = 
mock(pendingAckStoreProvider.getClass());
+// 3. Test retryable exception when checkInitializedBefore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+// First, the method checkInitializedBefore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail initialize")))
+// Then, the method will be executed successfully.
+.thenReturn(CompletableFuture.completedFuture(false));
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer2 = pulsarClient.newConsumer()
+.subscriptionName("subName2")
+.topic(topic)
+.subscribe();
+
+// 4. Test retryable exception when newPendingAckStore:
+// The consumer will be built successfully after one time retry.
+when(mockProvider.checkInitializedBefore(any()))
+.thenReturn(CompletableFuture.completedFuture(true));
+
+when(mockProvider.newPendingAckStore(any()))
+// First, the method newPendingAckStore will fail with a 
retryable exception.
+.thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail new store")))
+// Then, the method will be executed successfully.
+.thenCallRealMethod();
+transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
+@Cleanup
+Consumer consumer3 = pulsarClient.newConsumer()
+.subscriptionName("subName3")
+.topic(topic)
+.subscribe();
+
+// 5. Test no-retryable exception:
+// The consumer building will be failed without retrying.

Review Comment:
   >The consumer still retries because the error code from the broker side is 
UnknownError.
   Yes, it is correct. According to the discussion, manual intervention is 
necessary when an unexpected/unretryable exception happens. So  `UnknownError` 
may be acceptable. 



##
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java:
##
@@ -100,6 +104,79 @@ protected void cleanup() {
 super.internalCleanup();
 }
 
+/**
+ * Test consumer can be built successfully with retryable exception
+ * and get correct error with no-retryable exception.
+ * @throws Exception
+ */
+@Test(timeOut = 6)
+public void testBuildConsumerEncounterPendingAckInitFailure() throws 
Exception {
+// 1. Prepare and make sure the consumer can be built successfully.
+String topic = NAMESPACE1 + 
"/testUnloadSubscriptionWhenFailedInitPendingAck";
+@Cleanup
+Consumer consumer1 = pulsarClient.newConsumer()
+.subscriptionName("subName1")
+.topic(topic)
+.subscribe();
+// 2. Mock a transactionPendingAckStoreProvider to test building 
consumer
+// failing at 
transactionPendingAckStoreProvider::checkInitializedBefore.
+Field transactionPendingAckStoreProviderField = PulsarService.class
+

  1   2   >