Re: [I] Golang instance of functions missing many fields of the `FunctionDetails` [pulsar]
Technoboy- closed issue #22349: Golang instance of functions missing many fields of the `FunctionDetails` URL: https://github.com/apache/pulsar/issues/22349 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][fn] Pass FunctionDetails to Go instance (#22350)
This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7315aeb6258 [improve][fn] Pass FunctionDetails to Go instance (#22350) 7315aeb6258 is described below commit 7315aeb6258b7adc9d874268d50acb95ffc0cf2b Author: jiangpengcheng AuthorDate: Fri Mar 29 13:47:29 2024 +0800 [improve][fn] Pass FunctionDetails to Go instance (#22350) --- pulsar-function-go/conf/conf.go| 2 + pulsar-function-go/pf/instanceConf.go | 11 ++ pulsar-function-go/pf/instanceConf_test.go | 207 + .../functions/instance/go/GoInstanceConfig.java| 2 + .../pulsar/functions/runtime/RuntimeUtils.java | 6 + .../runtime/kubernetes/KubernetesRuntimeTest.java | 8 + 6 files changed, 236 insertions(+) diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go index 03513648fac..1442a0f865f 100644 --- a/pulsar-function-go/conf/conf.go +++ b/pulsar-function-go/conf/conf.go @@ -91,6 +91,8 @@ type Conf struct { UserConfig string `json:"userConfig" yaml:"userConfig"` //metrics config MetricsPort int `json:"metricsPort" yaml:"metricsPort"` + // FunctionDetails + FunctionDetails string `json:"functionDetails" yaml:"functionDetails"` } var ( diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go index 4cb60dd258a..844a2bc9b89 100644 --- a/pulsar-function-go/pf/instanceConf.go +++ b/pulsar-function-go/pf/instanceConf.go @@ -25,7 +25,9 @@ import ( "time" "github.com/apache/pulsar/pulsar-function-go/conf" + log "github.com/apache/pulsar/pulsar-function-go/logutil" pb "github.com/apache/pulsar/pulsar-function-go/pb" + "google.golang.org/protobuf/encoding/protojson" ) // This is the config passed to the Golang Instance. Contains all the information @@ -122,6 +124,15 @@ func newInstanceConfWithConf(cfg *conf.Conf) *instanceConf { tlsAllowInsecure:cfg.TLSAllowInsecureConnection, tlsHostnameVerification: cfg.TLSHostnameVerificationEnable, } + // parse the raw function details and ignore the unmarshal error(fallback to original way) + if cfg.FunctionDetails != "" { + functionDetails := pb.FunctionDetails{} + if err := protojson.Unmarshal([]byte(cfg.FunctionDetails), ); err != nil { + log.Errorf("Failed to unmarshal function details: %v", err) + } else { + instanceConf.funcDetails = functionDetails + } + } if instanceConf.funcDetails.ProcessingGuarantees == pb.ProcessingGuarantees_EFFECTIVELY_ONCE { panic("Go instance current not support EFFECTIVELY_ONCE processing guarantees.") diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go index 02aef913ebc..cc5f46e2fe1 100644 --- a/pulsar-function-go/pf/instanceConf_test.go +++ b/pulsar-function-go/pf/instanceConf_test.go @@ -20,6 +20,7 @@ package pf import ( + "fmt" "testing" cfg "github.com/apache/pulsar/pulsar-function-go/conf" @@ -113,3 +114,209 @@ func TestInstanceConf_Fail(t *testing.T) { newInstanceConfWithConf({ProcessingGuarantees: 3}) }, "Should have a panic") } + +func TestInstanceConf_WithDetails(t *testing.T) { + cfg := { + FunctionDetails: `{"tenant":"public","namespace":"default","name":"test-function","className":"process", +"logTopic":"test-logs","userConfig":"{\"key1\":\"value1\"}","runtime":"GO","autoAck":true,"parallelism":1, +"source":{"configs":"{\"username\":\"admin\"}","typeClassName":"string","timeoutMs":"15000", +"subscriptionName":"test-subscription","inputSpecs":{"input":{"schemaType":"avro","receiverQueueSize":{"value":1000}, +"schemaProperties":{"schema_prop1":"schema1"},"consumerProperties":{"consumer_prop1":"consumer1"},"cryptoSpec": +{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"SEND","consumerCryptoFailureAction":"CONSUME"}}} +,"negativeAckRedeliveryDelayMs":"15000"},"sink":{"configs":"{\"password\":\"admin\"}","topic":"test-output", +"typeClassName":"string","schemaType":"avro","producerSpec":{"maxPendingMessages":2000,"useThreadLocalProducers":true, +"cryptoSpec":{"cryptoKeyReaderClassName":"key-reader","producerCryptoFailureAction":"DISCARD"}, +"batchBuilder":"DEFAULT"}},"resources":{"cpu":2.0,"ram":"1024","disk":"1024"},"packageUrl":"/path/to/package", +"retryDetails":{"maxMessageRetries":3,"deadLetterTopic":"test-dead-letter-topic"},"secretsMap": +"{\"secret1\":\"secret-value1\"}","runtimeFlags":"flags","componentType":"FUNCTION","customRuntimeOptions":"options",
Re: [PR] [improve][fn] Pass FunctionDetails to Go instance [pulsar]
Technoboy- merged PR #22350: URL: https://github.com/apache/pulsar/pull/22350 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]
BewareMyPower commented on PR #21274: URL: https://github.com/apache/pulsar/pull/21274#issuecomment-2026681713 @codelipenghui PTAL again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][broker] Replication rate limiter counts batch messages [pulsar]
nodece opened a new pull request, #22380: URL: https://github.com/apache/pulsar/pull/22380 ### Motivation The batch messages include multiple messages, but we only count 1. ### Modifications If `msg.getMessageBuilder().hasNumMessagesInBatch()` returns true, use this value; otherwise, use 1. ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-2.11 updated: [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new 03493ef03c0 [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245) 03493ef03c0 is described below commit 03493ef03c07fd3a30f2b867baac42776c94d614 Author: fengyubiao AuthorDate: Fri Mar 29 12:06:26 2024 +0800 [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245) (cherry picked from commit e34ea626a65da4c8e1578010f857aa961a7b5c55) --- .../persistent/MessageRedeliveryController.java| 8 + .../PersistentDispatcherMultipleConsumers.java | 47 +++- ...istentStickyKeyDispatcherMultipleConsumers.java | 107 - .../client/api/KeySharedSubscriptionTest.java | 266 + .../pulsar/client/api/ProducerConsumerBase.java| 66 + 5 files changed, 473 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 934628b05a9..5ed6afb3f07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -95,6 +95,14 @@ public class MessageRedeliveryController { } } +public Long getHash(long ledgerId, long entryId) { +LongPair value = hashesToBeBlocked.get(ledgerId, entryId); +if (value == null) { +return null; +} +return value.first; +} + public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { if (!allowOutOfOrderDelivery) { List keysToRemove = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index d69b1dbaf91..9ff7b426bdc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -299,24 +299,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } NavigableSet messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - -if (!messagesToReplayNow.isEmpty()) { +NavigableSet messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow); +if (!messagesToReplayFiltered.isEmpty()) { if (log.isDebugEnabled()) { -log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), -consumerList.size()); +log.debug("[{}] Schedule replay of {} messages for {} consumers", name, +messagesToReplayFiltered.size(), consumerList.size()); } havePendingReplayRead = true; minReplayedPosition = messagesToReplayNow.first(); Set deletedMessages = topic.isDelayedDeliveryEnabled() -? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); +? asyncReplayEntriesInOrder(messagesToReplayFiltered) +: asyncReplayEntries(messagesToReplayFiltered); // clear already acked positions from replay bucket deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called -if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { +if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; readMoreEntriesAsync(); } @@ -325,7 +326,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); } -} else if (!havePendingRead) {
(pulsar) branch branch-3.0 updated: [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5b37e8434d9 [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245) 5b37e8434d9 is described below commit 5b37e8434d93c135b7c36c76a8177e2b51db5556 Author: fengyubiao AuthorDate: Fri Mar 29 12:06:26 2024 +0800 [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245) (cherry picked from commit e34ea626a65da4c8e1578010f857aa961a7b5c55) --- .../persistent/MessageRedeliveryController.java| 8 + .../PersistentDispatcherMultipleConsumers.java | 47 +++- ...istentStickyKeyDispatcherMultipleConsumers.java | 104 +++- .../client/api/KeySharedSubscriptionTest.java | 266 + .../pulsar/client/api/ProducerConsumerBase.java| 66 + 5 files changed, 470 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 5bf3f5506fa..63803177242 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -95,6 +95,14 @@ public class MessageRedeliveryController { } } +public Long getHash(long ledgerId, long entryId) { +LongPair value = hashesToBeBlocked.get(ledgerId, entryId); +if (value == null) { +return null; +} +return value.first; +} + public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { if (!allowOutOfOrderDelivery) { List keysToRemove = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c164abf905d..9d0dba798ad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -309,24 +309,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } NavigableSet messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - -if (!messagesToReplayNow.isEmpty()) { +NavigableSet messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow); +if (!messagesToReplayFiltered.isEmpty()) { if (log.isDebugEnabled()) { -log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), -consumerList.size()); +log.debug("[{}] Schedule replay of {} messages for {} consumers", name, +messagesToReplayFiltered.size(), consumerList.size()); } havePendingReplayRead = true; minReplayedPosition = messagesToReplayNow.first(); Set deletedMessages = topic.isDelayedDeliveryEnabled() -? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); +? asyncReplayEntriesInOrder(messagesToReplayFiltered) +: asyncReplayEntries(messagesToReplayFiltered); // clear already acked positions from replay bucket deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called -if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { +if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; readMoreEntriesAsync(); } @@ -335,7 +336,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); } -} else if (!havePendingRead) { +
Re: [PR] [fix][sec] implicit narrowing conversion in compound assignment [pulsar]
liangyepianzhou merged PR #22074: URL: https://github.com/apache/pulsar/pull/22074 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][sec] implicit narrowing conversion in compound assignment (#22074)
This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0701d7eedce [fix][sec] implicit narrowing conversion in compound assignment (#22074) 0701d7eedce is described below commit 0701d7eedcef6aae750b5067139caf8e73434818 Author: Xiangying Meng <55571188+liangyepianz...@users.noreply.github.com> AuthorDate: Fri Mar 29 12:43:11 2024 +0800 [fix][sec] implicit narrowing conversion in compound assignment (#22074) --- .../org/apache/pulsar/common/policies/data/SubscriptionStats.java | 2 +- .../pulsar/common/policies/data/stats/SubscriptionStatsImpl.java | 2 +- .../org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index 9ff94a2952e..d4850adaa6f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -46,7 +46,7 @@ public interface SubscriptionStats { double getMessageAckRate(); /** Chunked message dispatch rate. */ -int getChunkedMessageRate(); +double getChunkedMessageRate(); /** Number of entries in the subscription backlog. */ long getMsgBacklog(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index bed8aabf27d..a8ea0060629 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -54,7 +54,7 @@ public class SubscriptionStatsImpl implements SubscriptionStats { public double messageAckRate; /** Chunked message dispatch rate. */ -public int chunkedMessageRate; +public double chunkedMessageRate; /** Number of entries in the subscription backlog. */ public long msgBacklog; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java index 60ade64e688..8c273236945 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java @@ -66,7 +66,7 @@ public class LocalBrokerData implements LoadManagerReport { // The stats given in the most recent invocation of update. private Map lastStats; -private int numTopics; +private long numTopics; private int numBundles; private int numConsumers; private int numProducers; @@ -202,7 +202,7 @@ public class LocalBrokerData implements LoadManagerReport { msgRateOut = 0; msgThroughputIn = 0; msgThroughputOut = 0; -int totalNumTopics = 0; +long totalNumTopics = 0; int totalNumBundles = 0; int totalNumConsumers = 0; int totalNumProducers = 0;
(pulsar) branch master updated: [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245)
This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e34ea626a65 [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245) e34ea626a65 is described below commit e34ea626a65da4c8e1578010f857aa961a7b5c55 Author: fengyubiao AuthorDate: Fri Mar 29 12:06:26 2024 +0800 [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode (#22245) --- .../persistent/MessageRedeliveryController.java| 8 + .../PersistentDispatcherMultipleConsumers.java | 47 +++- ...istentStickyKeyDispatcherMultipleConsumers.java | 104 +++- .../client/api/KeySharedSubscriptionTest.java | 266 + .../pulsar/client/api/ProducerConsumerBase.java| 66 + .../api/SubscriptionPauseOnAckStatPersistTest.java | 78 +- 6 files changed, 477 insertions(+), 92 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 5bf3f5506fa..63803177242 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -95,6 +95,14 @@ public class MessageRedeliveryController { } } +public Long getHash(long ledgerId, long entryId) { +LongPair value = hashesToBeBlocked.get(ledgerId, entryId); +if (value == null) { +return null; +} +return value.first; +} + public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { if (!allowOutOfOrderDelivery) { List keysToRemove = new ArrayList<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 039104fe022..b441400dae1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -334,24 +334,25 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } NavigableSet messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - -if (!messagesToReplayNow.isEmpty()) { +NavigableSet messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow); +if (!messagesToReplayFiltered.isEmpty()) { if (log.isDebugEnabled()) { -log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(), -consumerList.size()); +log.debug("[{}] Schedule replay of {} messages for {} consumers", name, +messagesToReplayFiltered.size(), consumerList.size()); } havePendingReplayRead = true; minReplayedPosition = messagesToReplayNow.first(); Set deletedMessages = topic.isDelayedDeliveryEnabled() -? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); +? asyncReplayEntriesInOrder(messagesToReplayFiltered) +: asyncReplayEntries(messagesToReplayFiltered); // clear already acked positions from replay bucket deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called -if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { +if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; readMoreEntriesAsync(); } @@ -360,7 +361,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); } -} else if (!havePendingRead) { +} else if
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
codelipenghui merged PR #22245: URL: https://github.com/apache/pulsar/pull/22245 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
heesung-sn commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543996536 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: You also need to update configs like in this PR. Otherwise, the monitor job will clean the stuck unloading state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
dragosvictor commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543992617 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: Can confirm @Demogorgon314's finding. The test doesn't always execute the respective codepath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
dragosvictor commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543991161 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: You're right, thanks for clarifying! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
heesung-sn commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543978471 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: Interesting. I see the test fails if I remove this on my laptop. https://github.com/apache/pulsar/assets/103456639/ea53d7a6-4261-4654-93cb-0eef0b59e2fc;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
Demogorgon314 commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543971934 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: Do we have a test to cover this change? I noticed that even after removing this change, the `testUnloadUponTopicLookupFailure` test still passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][ml] No rollover inactive ledgers when metadata service invalid [pulsar]
Technoboy- closed pull request #22284: [fix][ml] No rollover inactive ledgers when metadata service invalid URL: https://github.com/apache/pulsar/pull/22284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
poorbarcode commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543965530 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java: ## @@ -472,18 +475,18 @@ protected NavigableSet filterOutEntriesWillBeDiscarded(NavigableSe // Maybe using HashRangeExclusiveStickyKeyConsumerSelector. continue; } -if (getAvailablePermits(c) == 0) { +groupedPositions.computeIfAbsent(c, k -> new ArrayList<>()).add(pos); +} +// Filter positions by the Recently Joined Position rule. +for (Map.Entry> item : groupedPositions.entrySet()) { +int availablePermits = getAvailablePermits(item.getKey()); +if (availablePermits == 0) { continue; } -PositionImpl currentMaxReadPosition = recentlyJoinedConsumers.get(c); -if (currentMaxReadPosition == null) { -res.add(pos); -} else if (pos.compareTo(firstMaxReadPos) < 0 && pos.compareTo(currentMaxReadPosition) < 0) { -res.add(pos); -} else { -// The subsequent positions will also be larger than "firstMaxReadPos", why need to check continuous. -// Because the subsequent positions may be delivered to a consumer which does not in the -// "recentlyJoinedConsumers". +int posCountToRead = getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), availablePermits, +ReadType.Replay, null); +for (int i = 0; i < posCountToRead; i++) { +res.add(item.getValue().get(i)); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376)
This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a3bf4e8a42c [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) a3bf4e8a42c is described below commit a3bf4e8a42c84a0ee5b4c45b50d48daed0b3de0c Author: Baodi Shi AuthorDate: Fri Mar 29 08:33:27 2024 +0800 [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) --- pulsar-io/jdbc/core/pom.xml| 7 + .../pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java | 5 .../pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 + .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 31 +++--- 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index 586307e8b86..0232a699680 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -71,6 +71,13 @@ provided + + ${project.groupId} + pulsar-client-original + ${project.version} + test + + \ No newline at end of file diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java index 36c36740919..3655688c0f3 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; @@ -137,6 +138,10 @@ public abstract class BaseJdbcAutoSchemaSink extends JdbcAbstractSink data.get(k); } else { +SchemaType schemaType = message.getSchema().getSchemaInfo().getType(); +if (schemaType.isPrimitive()) { +throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType); +} recordValueGetter = (key) -> ((GenericRecord) record).getField(key); } String action = message.getProperties().get(ACTION_PROPERTY); diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java index b15eb832242..c088dd3c42c 100644 --- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java +++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java @@ -22,6 +22,10 @@ import java.util.function.Function; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.functions.api.Record; import org.testng.Assert; import org.testng.annotations.Test; @@ -143,5 +147,26 @@ public class BaseJdbcAutoSchemaSinkTest { return consumer.apply(record).endRecord().getFields().get(0).schema(); } +@Test(expectedExceptions = UnsupportedOperationException.class, +expectedExceptionsMessageRegExp = "Primitive schema is not supported.*") +@SuppressWarnings("unchecked") +public void testNotSupportPrimitiveSchema() { +BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new BaseJdbcAutoSchemaSink() {}; +AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING); +Record record = new Record() { +@Override +public org.apache.pulsar.client.api.Schema getSchema() { +return autoConsumeSchema; +} + +@Override +public GenericRecord getValue() { +return null; +} +}; +baseJdbcAutoSchemaSink.createMutation((Record) record); +} + } \ No newline at end of file diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index d9ed4cbd442..ca01615bef1 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++
Re: [PR] [improve][io]: Add validation for JDBC sink not supporting primitive schema [pulsar]
shibd merged PR #22376: URL: https://github.com/apache/pulsar/pull/22376 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Apache Beam support for Pulsar [pulsar]
GitHub user vcastell-tibco edited a comment on the discussion: Apache Beam support for Pulsar I see that a Pulsar connector was made available as of BEAM 2.38.0 release but I don't see Pulsar as an official connector on below page. Is the Pulsar IO connector official or not? If official then can someone please update the page since it gives idea that a Pulsar IO connector is not available. https://beam.apache.org/documentation/io/connectors/ GitHub link: https://github.com/apache/pulsar/discussions/18453#discussioncomment-8902102 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
codelipenghui commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543874681 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java: ## @@ -472,18 +475,18 @@ protected NavigableSet filterOutEntriesWillBeDiscarded(NavigableSe // Maybe using HashRangeExclusiveStickyKeyConsumerSelector. continue; } -if (getAvailablePermits(c) == 0) { +groupedPositions.computeIfAbsent(c, k -> new ArrayList<>()).add(pos); +} +// Filter positions by the Recently Joined Position rule. +for (Map.Entry> item : groupedPositions.entrySet()) { +int availablePermits = getAvailablePermits(item.getKey()); +if (availablePermits == 0) { continue; } -PositionImpl currentMaxReadPosition = recentlyJoinedConsumers.get(c); -if (currentMaxReadPosition == null) { -res.add(pos); -} else if (pos.compareTo(firstMaxReadPos) < 0 && pos.compareTo(currentMaxReadPosition) < 0) { -res.add(pos); -} else { -// The subsequent positions will also be larger than "firstMaxReadPos", why need to check continuous. -// Because the subsequent positions may be delivered to a consumer which does not in the -// "recentlyJoinedConsumers". +int posCountToRead = getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), availablePermits, +ReadType.Replay, null); +for (int i = 0; i < posCountToRead; i++) { +res.add(item.getValue().get(i)); Review Comment: Is it better to change to ``` res.addAll(item.getValue().subList(0, posCountToRead)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch main updated: Adapt the release process for PR 20981 changes
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new cfc1f8ab1e0b Adapt the release process for PR 20981 changes cfc1f8ab1e0b is described below commit cfc1f8ab1e0bd3d44767a9c059bb2613c5c53321 Author: Lari Hotari AuthorDate: Thu Mar 28 22:26:10 2024 +0200 Adapt the release process for PR 20981 changes - see https://github.com/apache/pulsar/pull/20981 for the change --- contribute/release-process.md | 79 +++ 1 file changed, 57 insertions(+), 22 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index 8413025ec1bd..5fdff1d26a08 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -235,7 +235,7 @@ The _apache-pulsar-shell_ artifacts are distributed beginning with release 2.11. ::: -### Inspect the artifacts +### Check licenses First, check that the `LICENSE` and `NOTICE` files cover all included jars for the bin package. You can use script to cross-validate `LICENSE` file with included jars: @@ -245,20 +245,7 @@ src/check-binary-license.sh distribution/server/target/apache-pulsar-*-bin.tar.g ``` In some older branches, the script is called `src/check-binary-license` instead of `src/check-binary-license.sh`. - - Then, run Apache RAT to verify the license headers in the src package: -```shell -cd /tmp -tar -xvzf $PULSAR_PATH/target/apache-pulsar-*-src.tar.gz -cd apache-pulsar-$VERSION_WITHOUT_RC-src -mvn apache-rat:check -``` - -Finally, use instructions in [verifying release candidates](validate-release-candidate.md) page to do some sanity checks on the produced binary distributions. - -### Sign and stage the artifacts on SVN - -The src and bin artifacts need to be signed and uploaded to the dist SVN repository for staging. This step should not run inside the $PULSAR_PATH. +### Create and publish the GPG key if you haven't already done this If you haven't already done it, [create and publish the GPG key](create-gpg-keys.md). You will use the key to sign the release artifacts. @@ -274,28 +261,59 @@ default-key This can be automated with this command: ```shell +# KEY_ID is in short format, subset key id visible in gpg -K KEY_ID=$(gpg --list-keys --with-colons $apache_u...@apache.org | egrep "^pub" | awk -F: '{print $5}') echo "default-key $KEY_ID" >> ~/.gnupg/gpg.conf ``` +### Sign and stage the artifacts to local SVN directory + +The src and bin artifacts need to be signed and finally uploaded to the dist SVN repository for staging. This step should not run inside the $PULSAR_PATH. + ```shell # make sure to run svn mkdir commmand in a different dir(NOT IN $PULSAR_PATH). mkdir ~/pulsar-svn-release-$VERSION_RC cd ~/pulsar-svn-release-$VERSION_RC +# create an empty directory in the SVN server svn mkdir --username $APACHE_USER -m "Add directory for pulsar $VERSION_RC release" https://dist.apache.org/repos/dist/dev/pulsar/pulsar-$VERSION_RC +# checkout the empty directory svn co https://dist.apache.org/repos/dist/dev/pulsar/pulsar-$VERSION_RC +# cd into the directory cd pulsar-$VERSION_RC +# stage the release artifacts $PULSAR_PATH/src/stage-release.sh . +# Please check the size of the files in the `pulsar-2.X.0-candidate-1`. +# If you build the artifacts without a clean workspace, the `apache-pulsar-2.X.0-src.tar.gz` files +# may be too large to be unable to upload. +ls -ltra +du -ms * + # Verify the artifacts are correctly signed have correct checksums: ( for i in **/*.(tar.gz|zip|nar); do echo $i; gpg --verify $i.asc $i || exit 1 ; done ) ( for i in **/*.(tar.gz|zip|nar); do echo $i; shasum -a 512 -c $i.sha512 || exit 1 ; done ) -# Please check the size of the files in the `pulsar-2.X.0-candidate-1`. -# If you build the artifacts without a clean workspace, the `apache-pulsar-2.X.0-src.tar.gz` files -# may be too large to be unable to upload. +# don't commit and upload yet, there's a separate step for handling that +``` + +### Validate the release files + +Then use instructions in [verifying release candidates](validate-release-candidate.md) page to do some sanity checks on the produced binary distributions. + + Make sure to run Apache RAT to verify the license headers in the src package: +```shell +cd /tmp +tar -xvzf ~/pulsar-svn-release-$VERSION_RC/pulsar-$VERSION_RC/apache-pulsar-*-src.tar.gz +cd apache-pulsar-$VERSION_WITHOUT_RC-src +mvn apache-rat:check +``` + +### Commit and upload the staged files in the local SVN directory to ASF SVN server + +```shell +cd ~/pulsar-svn-release-$VERSION_RC/pulsar-$VERSION_RC svn add * svn ci -m "Staging artifacts and signature for Pulsar release $VERSION_RC" ``` @@ -368,7 +386,25 @@ DOCKER_USER= DOCKER_PASSWORD= DOCKER_ORG= -DOCKER_ORG= docker login -u $DOCKER_USER mvn install
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
heesung-sn commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543682634 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: This `return null` does not return a null element to `closeFutures`. I think it will add CompletableFuture(null). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Add a metric for geo replication for tracking replicated subscriptions snapshot timeouts [pulsar]
lhotari commented on issue #21793: URL: https://github.com/apache/pulsar/issues/21793#issuecomment-2026074347 > @lhotari I have made a [PR](https://github.com/nikam14/pulsar/pull/3) in forked repo can you take a look. @nikam14 looks good. Please go ahead and create a apache/pulsar PR. Please fill in the details in the description and name the PR properly too. The [contribution guide](https://pulsar.apache.org/contribute/) contains advice unless the PR template explains it. For metrics, there will also need to be documentation to be added to pulsar-site repository. You can usually get help also on [Apache Pulsar Slack's](https://pulsar.apache.org/community/#section-discussions) #dev channel for anything related to contributions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r68192 [2/2] - in /dev/pulsar/pulsar-3.2.2-candidate-1: ./ connectors/
Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.asc == --- dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.asc (added) +++ dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.asc Thu Mar 28 20:35:44 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYF0wIACgkQ0/pn1SLF +UlYz9A/9HULfgH6bXaLj91QJlmbemhnMBkzEM4dGFmcATzv+vRmmh8y2vKHlSNxS +vbk5Y8pNJGo3a8Bblay4DjhGhOsATB81nVPEYH22np4DP4gb4AhZ6Vp9JkegaBPV +5PcswVh9uzI2e9006kpedPZZnMy7wNHZOfpgyXsjdPcOajbB+pKFFDGxWmbIsgWB +GDq+p6a5dmcbMlHDx+d6U25QMHRUhPuZvzwjOmZF6t/EVcgsvGy1Y//J10XVKhbT +73M/X8nVZwEZ6BkGtgj0emXKUO8zwMuCU0p9AyS9akK9f4gQ3aWiueyRp5+ytpSa +2TSIgeRkZXG9vI3xBxCnGCFI3Q450yoyP8DYEp2uR2OXMl71YARGPkubkR1MEe3x +0D25G28zUHjPV4wIk/dIg1klg2iDknB6qpUb4CCzFQZoaH4Tymn7j24yjkz96ca0 +gD+VYzoiY2TCnrP1a80oP2Th4YO4UJgv0JIygph5BvBwXmo9Ic4arWL2Fi8BrK1k +rYNIk7DpD+6G6CVfGShFKYvS2Ll5TM6x8cyNlzh9ETlA02Xbz/oK5IF7Tf4LjCYd +T3RH7fcYlrO/z8MNhzvt3C3er78OzZmZbJZAeQBHES4+zXXsVqngi6rqtz24KJmN +tApldCWFJCAq+UI4qobsO+Mp4gnZXNS93TzXIt/56mGJRgF9Huc= +=YbPn +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.sha512 == --- dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.sha512 (added) +++ dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.2.2.nar.sha512 Thu Mar 28 20:35:44 2024 @@ -0,0 +1 @@ +1b8fbd8b1a3f42d40dbf0216e0c59db6bd3a1e3fa6c223d5dc45272d1d7a61ac33b64535ce6c819c588c9be83724009180151392ad093490b16d799a5d456084 ./connectors/pulsar-io-jdbc-sqlite-3.2.2.nar Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.asc == --- dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.asc (added) +++ dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.asc Thu Mar 28 20:35:44 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYF0vwACgkQ0/pn1SLF +UlaBvBAAgxmF+uuKUZlu6OV+fTaB/QsA/R3A9tLQtumdQTRBFJwgNBtu0fdhtVpd +IOtf/FtyTz7W+E/UL+E9kqhZ0g5KyFOHkF/8wkv8eDIDyFl1zQkrT6LG4XWQoQ2G +Wx1dQeb6wn1r45goj+aXr552TQVQ4NnWbN+FSdNaViWK0Z7IyXH/I7DvGRal/o2o +B1VCIdYZBzfXpkNm4dHgxExr74XJnQ5DZD5ydkHGYR5Jl2U+YCpGKTcqqy9PdGNS +fV1jeW49vdavIXI1Fhe+7kwnoQt3dg+4wesRFGukn449syyGK8v7tc0WxN1i8xZu +7EEhNK5H05RwIG3lFwzlxKFA+pdrZjCIvHmaXjFfHs2QEgqDy/m79KW3Y4nWFRRf +J97iHUkbXxnQ5LPGwDM7/vw1SR+mxLg67QFZsSZCNDkODSSItP4iKI0HvcbHZLHr +UxRgTYXysbPLqJLFjUWksoZggSlt8VRei+CL1RzQQPn6r+MHhiPhUTMk6/J4gsvY +DJAKZHocCSUQ8qNW/PKZ55cvHzRnOfCtGf6jjR0EfSMJv29Sw0Hw4cT597MqCRNk +13c92KEW4wp6E1nDnD/spW0k9CKmDJTPTDQtqQ7iF661GqX8UfPs/rkORnJ/rVpt +GU+Hn3ArVLe6UcJDb/F4S4zKMOBz8QkXO+32rwEzuEqtGp0vYcE= +=dcd/ +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.sha512 == --- dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.sha512 (added) +++ dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-3.2.2.nar.sha512 Thu Mar 28 20:35:44 2024 @@ -0,0 +1 @@ +92117b86a36d769e3b04871e572c12e8de186ba7ee1b7a06d011f8507eec8da6c9587a51c22460c353cd93efefb9036d55f301a66a13ef72df80437d8ad9acd7 ./connectors/pulsar-io-kafka-3.2.2.nar Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.2.2.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.2.2.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.2.2.nar.asc
svn commit: r68192 [1/2] - in /dev/pulsar/pulsar-3.2.2-candidate-1: ./ connectors/
Author: lhotari Date: Thu Mar 28 20:35:44 2024 New Revision: 68192 Log: Staging artifacts and signature for Pulsar release 3.2.2-candidate-1 Added: dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-bin.tar.gz (with props) dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-bin.tar.gz.asc dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-bin.tar.gz.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-src.tar.gz (with props) dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-src.tar.gz.asc dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-3.2.2-src.tar.gz.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-offloaders-3.2.2-bin.tar.gz (with props) dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-offloaders-3.2.2-bin.tar.gz.asc dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-offloaders-3.2.2-bin.tar.gz.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.tar.gz (with props) dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.tar.gz.asc dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.tar.gz.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.zip (with props) dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.zip.asc dev/pulsar/pulsar-3.2.2-candidate-1/apache-pulsar-shell-3.2.2-bin.zip.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/ dev/pulsar/pulsar-3.2.2-candidate-1/connectors/LICENSE dev/pulsar/pulsar-3.2.2-candidate-1/connectors/README dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-aerospike-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-aerospike-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-aerospike-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-alluxio-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-alluxio-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-alluxio-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-batch-data-generator-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-batch-data-generator-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-batch-data-generator-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-canal-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-canal-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-canal-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-cassandra-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-cassandra-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-cassandra-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-data-generator-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-data-generator-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-data-generator-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mongodb-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mongodb-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mongodb-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mssql-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mssql-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mssql-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mysql-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mysql-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-mysql-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-oracle-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-oracle-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-oracle-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-postgres-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-postgres-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-debezium-postgres-3.2.2.nar.sha512 dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-dynamodb-3.2.2.nar (with props) dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-dynamodb-3.2.2.nar.asc dev/pulsar/pulsar-3.2.2-candidate-1/connectors/pulsar-io-dynamodb-3.2.2.nar.sha512
Re: [PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalance
dragosvictor commented on code in PR #22379: URL: https://github.com/apache/pulsar/pull/22379#discussion_r1543614920 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java: ## @@ -2244,9 +2244,18 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( disconnectClients, closeWithoutWaitingClientDisconnect) -: CompletableFuture.completedFuture(null))); +: CompletableFuture.completedFuture(null)) +.exceptionally(e -> { +if (e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException +&& e.getMessage().contains("Please redo the lookup")) { +log.warn("[{}] Topic ownership check failed. Skipping it", topicName); +return null; Review Comment: Since we're adding a null element to `closeFutures`, would this cause an exception further down in `FutureUtil.waitForAll`? The [documentation](https://docs.oracle.com/en%2Fjava%2Fjavase%2F11%2Fdocs%2Fapi%2F%2F/java.base/java/util/concurrent/CompletableFuture.html#allOf(java.util.concurrent.CompletableFuture...)) suggests so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r68191 - /dev/pulsar/pulsar-3.2.2-candidate-1/
Author: lhotari Date: Thu Mar 28 20:28:25 2024 New Revision: 68191 Log: Add directory for pulsar 3.2.2-candidate-1 release Added: dev/pulsar/pulsar-3.2.2-candidate-1/
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
poorbarcode commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543568051 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java: ## @@ -437,6 +436,82 @@ protected synchronized NavigableSet getMessagesToReplayNow(int max } } +private int getAvailablePermits(Consumer c) { +int availablePermits = Math.max(c.getAvailablePermits(), 0); +if (c.getMaxUnackedMessages() > 0) { +// Avoid negative number +int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); +availablePermits = Math.min(availablePermits, remainUnAckedMessages); +} +return availablePermits; +} + +@Override +protected NavigableSet filterOutEntriesWillBeDiscarded(NavigableSet src) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] SnapshotSegmentAbortedTxnProcessorImpl contains thread safety issues [pulsar]
nikam14 commented on issue #22116: URL: https://github.com/apache/pulsar/issues/22116#issuecomment-2025994480 @lhotari can you review above mentioned PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Add a metric for geo replication for tracking replicated subscriptions snapshot timeouts [pulsar]
nikam14 commented on issue #21793: URL: https://github.com/apache/pulsar/issues/21793#issuecomment-2025992677 @lhotari I have made a [PR](https://github.com/nikam14/pulsar/pull/3) in forked repo can you take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) annotated tag v3.2.2-candidate-1 updated (bacedb5bd6b -> add6ec19180)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to annotated tag v3.2.2-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar.git *** WARNING: tag v3.2.2-candidate-1 was modified! *** from bacedb5bd6b (commit) to add6ec19180 (tag) tagging bacedb5bd6b27e53cb3036976bec27340089b179 (commit) replaces v3.2.1 by Lari Hotari on Thu Mar 28 21:45:57 2024 +0200 - Log - Release 3.2.2-candidate-1 -BEGIN PGP SIGNATURE- iQJHBAABCgAxFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFyPUTHGxob3RhcmlA YXBhY2hlLm9yZwAKCRDT+mfVIsVSVr4VEACulxLPDEqiywkloHcd0hO3+mbIlXrB cdAiO97tbpE6Xzx2k4dGAb6s/umlzpy7tAYOBTEsa2xMmsC2kfED17orUgsi277M Oa+QYEvidq4n8dUzJv3VVIeuBhScSvKXb1Qv7Xgw4ltCt3gjxfu8fBUOwDXXTvS/ mQ5/im4XGpZkR1AuPQ4c0PTB0ebXQbbUruSaIB8cU5wLbS+V9/ka0heFeH1x7+Pe wNqhlcdTRiD7jKf1iRIR3rn2Vhw33R5Cw9NHL+pafLvwRtaRsrZmLBc/ME6JCkV0 T4c+6sjPexpYmU9m9AyO9wS/75lMEpIYlkhUeZtTxwLftdMB9mC+72V6qgIzr/c6 uJoVRW72L1AbygjtLQs9zcyCaCyOVgJqgEYHAXuQ2C002ZP68KQ1HHnTvlJ19PKw TkJiJbHtE9z+8Ab2anuHCetTtI+hXI9Niaybar2QXwh2Soy8+qVZw7BMrdJt60AC Pg5uFPuGDgOvumxnBwi4hEqDT46olnW8oJLbbJ/IC8ti3gT/lFu82Eru0GVda9XP IJpV9g/sQp3KLFu6eaCybyWaD7FY9gjqHqDFIN0IeTuz/7pSsKc1OKzam5ePZ0e9 wi1Uge1/Jf1ZPeqYDDgt5muxifmuhpZBwqFrtC/AwaxajBWt7gYCNJ5/wCX4+vR1 yNhiTxB9mGbqMQ== =t9kS -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar) branch branch-3.2 updated (4847648e78d -> bacedb5bd6b)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 4847648e78d [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) add bacedb5bd6b Release 3.2.2 No new revisions were added by this update. Summary of changes: bouncy-castle/bc/pom.xml | 2 +- bouncy-castle/bcfips-include-test/pom.xml | 2 +- bouncy-castle/bcfips/pom.xml | 2 +- bouncy-castle/pom.xml | 2 +- buildtools/pom.xml| 4 ++-- distribution/io/pom.xml | 2 +- distribution/offloaders/pom.xml | 2 +- distribution/pom.xml | 2 +- distribution/server/pom.xml | 2 +- distribution/shell/pom.xml| 2 +- docker/pom.xml| 2 +- docker/pulsar-all/pom.xml | 2 +- docker/pulsar/pom.xml | 2 +- jclouds-shaded/pom.xml| 2 +- managed-ledger/pom.xml| 2 +- microbench/pom.xml| 2 +- pom.xml | 4 ++-- pulsar-bom/pom.xml| 4 ++-- pulsar-broker-auth-athenz/pom.xml | 2 +- pulsar-broker-auth-oidc/pom.xml | 2 +- pulsar-broker-auth-sasl/pom.xml | 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker/pom.xml | 2 +- pulsar-cli-utils/pom.xml | 2 +- pulsar-client-1x-base/pom.xml | 2 +- pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +- pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +- pulsar-client-admin-api/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml| 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-all/pom.xml | 2 +- pulsar-client-api/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml | 2 +- pulsar-client-auth-sasl/pom.xml | 2 +- pulsar-client-messagecrypto-bc/pom.xml| 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools-api/pom.xml | 2 +- pulsar-client-tools-customcommand-example/pom.xml | 2 +- pulsar-client-tools-test/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml | 2 +- pulsar-common/pom.xml | 2 +- pulsar-config-validation/pom.xml | 2 +- pulsar-docs-tools/pom.xml | 2 +- pulsar-functions/api-java/pom.xml | 2 +- pulsar-functions/instance/pom.xml | 2 +- pulsar-functions/java-examples-builtin/pom.xml| 2 +- pulsar-functions/java-examples/pom.xml| 2 +- pulsar-functions/localrun-shaded/pom.xml | 2 +- pulsar-functions/localrun/pom.xml | 2 +- pulsar-functions/pom.xml | 2 +- pulsar-functions/proto/pom.xml| 2 +- pulsar-functions/runtime-all/pom.xml | 2 +- pulsar-functions/runtime/pom.xml | 2 +- pulsar-functions/secrets/pom.xml | 2 +- pulsar-functions/utils/pom.xml| 2 +- pulsar-functions/worker/pom.xml | 2 +- pulsar-io/aerospike/pom.xml | 2 +- pulsar-io/alluxio/pom.xml | 2 +- pulsar-io/aws/pom.xml | 2 +- pulsar-io/batch-data-generator/pom.xml| 2 +- pulsar-io/batch-discovery-triggerers/pom.xml | 2 +- pulsar-io/canal/pom.xml | 2 +- pulsar-io/cassandra/pom.xml | 2 +- pulsar-io/common/pom.xml | 2 +- pulsar-io/core/pom.xml| 2 +- pulsar-io/data-generator/pom.xml | 2 +- pulsar-io/debezium/core/pom.xml | 2 +- pulsar-io/debezium/mongodb/pom.xml| 2 +- pulsar-io/debezium/mssql/pom.xml | 2 +- pulsar-io/debezium/mysql/pom.xml | 2 +- pulsar-io/debezium/oracle/pom.xml | 2 +- pulsar-io/debezium/pom.xml| 2 +- pulsar-io/debezium/postgres/pom.xml | 2 +-
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
lhotari commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543543748 ## pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java: ## @@ -54,13 +55,23 @@ private PulsarAdminBuilderImpl(ClientConfigurationData conf) { @Override public PulsarAdminBuilder clone() { -return new PulsarAdminBuilderImpl(conf.clone()); +PulsarAdminBuilderImpl pulsarAdminBuilder = new PulsarAdminBuilderImpl(conf.clone()); +pulsarAdminBuilder.clientBuilderClassLoader = clientBuilderClassLoader; +pulsarAdminBuilder.gzipCompressionEnabled = gzipCompressionEnabled; +return pulsarAdminBuilder; } @Override public PulsarAdminBuilder loadConf(Map config) { conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class); setAuthenticationFromPropsIfAvailable(conf); +if (config.containsKey("gzipCompressionEnabled")) { +if (config.get("gzipCompressionEnabled") instanceof Boolean) { Review Comment: I agree, that would make sense. This PR is still in draft mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
lhotari commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543543178 ## pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java: ## @@ -54,13 +55,23 @@ private PulsarAdminBuilderImpl(ClientConfigurationData conf) { @Override public PulsarAdminBuilder clone() { -return new PulsarAdminBuilderImpl(conf.clone()); +PulsarAdminBuilderImpl pulsarAdminBuilder = new PulsarAdminBuilderImpl(conf.clone()); Review Comment: that is to implement the cloning. the clone method should return a similar object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
lhotari commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543540449 ## pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java: ## @@ -327,4 +327,11 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
lhotari commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543540449 ## pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java: ## @@ -327,4 +327,11 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
lhotari commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543538151 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java: ## @@ -294,8 +291,10 @@ public void start() throws PulsarServerException { ContextHandlerCollection contexts = new ContextHandlerCollection(); contexts.setHandlers(handlers.toArray(new Handler[handlers.size()])); +Handler handlerForContexts = GzipHandlerUtil.wrapWithGzipHandler(contexts, Review Comment: I agree that it is confusing. Refactorings are a different story. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
lhotari commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1543537005 ## pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/GzipHandlerUtil.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import java.util.List; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; + +public class GzipHandlerUtil { +public static Handler wrapWithGzipHandler(Handler innerHandler, List gzipCompressionExcludedPaths) { +Handler wrappedHandler; +if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) { Review Comment: > can't we just define gzip on the handlers we see fit? Which handlers would you exclude? I think that it's useful to enable it globally since the current assumption is that all usages will benefit from gzip compression. > Too much configuration option can be confusing. They can control the gzip by using headers, no? That's true, but there's risk that regressions are caused when changes are made. Adding the `httpServerGzipCompressionExcludedPaths` is a way to ensure that users aren't stuck if there's a regression for some corner case. There's a mailing list discussion https://lists.apache.org/thread/xbs1f5v7865fz1hm1vxqlw6o9dgy3d9z . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) branch main updated: Improve release vote email template
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new a848d0a1822a Improve release vote email template a848d0a1822a is described below commit a848d0a1822ab2b4aac2a483fa0ddd7a3018b8be Author: Lari Hotari AuthorDate: Thu Mar 28 21:13:41 2024 +0200 Improve release vote email template --- contribute/release-process.md | 30 -- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index 54f3911017db..8413025ec1bd 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -412,24 +412,41 @@ PULSAR_IMAGE_URL="" PULSAR_ALL_IMAGE_URL="" ``` +Set also these +```shell +PULSAR_IMAGE_NAME="$DOCKER_USER/pulsar:$VERSION_WITHOUT_RC-$(git rev-parse --short=7 v$VERSION_RC^{commit})" +PULSAR_ALL_IMAGE_NAME="$DOCKER_USER/pulsar-all:$VERSION_WITHOUT_RC-$(git rev-parse --short=7 v$VERSION_RC^{commit})" +# validate pulling, will take some time, you can skip this if you have a slow internet connection +docker pull $PULSAR_IMAGE_NAME +docker pull $PULSAR_ALL_IMAGE_NAME +# check that images are about right, you can skip this if you have a slow internet connection +docker run --rm $PULSAR_IMAGE_NAME bash -c 'ls /pulsar/lib' |less +docker run --rm $PULSAR_ALL_IMAGE_NAME bash -c 'ls /pulsar/lib' |less +``` + Now you can render the template to the clipboard ```shell tee >(pbcopy)
Re: [PR] [feat][ci] Add Trivy container scan Github workflow [pulsar]
merlimat merged PR #22063: URL: https://github.com/apache/pulsar/pull/22063 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated (6b2938223cf -> 8fc30df37e2)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 6b2938223cf [improve] PIP-342: OTel client metrics support (#22179) add 8fc30df37e2 [feat][ci] Add Trivy container scan Github workflow (#22063) No new revisions were added by this update. Summary of changes: .github/workflows/ci-trivy-container-scan.yaml | 66 ++ 1 file changed, 66 insertions(+) create mode 100644 .github/workflows/ci-trivy-container-scan.yaml
[PR] [fix][broker] Skip topic.close during unloading if the topic future fails with ownership check, and fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer [p
heesung-sn opened a new pull request, #22379: URL: https://github.com/apache/pulsar/pull/22379 Fixes https://github.com/apache/pulsar/issues/20157 ### Motivation https://github.com/apache/pulsar/issues/20157 found that the ExtensibleLoadManagerImplTest init step, unloading the test namespace often fails. ### Modifications - Skip topic.close during unloading if the topic future fails with ownership check. If the unloading(not transfer) happens in the middle of topic creation, the topic creation might fail because of the ownership check failure. In this case, we need to skip unloading that topic, not to fail the unload command. - Fix isBundleOwnedByAnyBroker to use ns.checkOwnershipPresentAsync for ExtensibleLoadBalancer. Currently, isBundleOwnedByAnyBroker is unnecessarily selecting a new owner broker, if no owner is found. We should simply check ownership, instead. ### Verifying this change - [x] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Flaky-test: ExtensibleLoadManagerImplTest.initializeState [pulsar]
heesung-sn commented on issue #20157: URL: https://github.com/apache/pulsar/issues/20157#issuecomment-2025894426 hi, I raised another bug fix here. https://github.com/apache/pulsar/pull/22379 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar-site) 02/02: Add link to validate_pulsar_release.sh script
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git commit b8ce7b8574e5aab634a14043ff82fff94636d903 Author: Lari Hotari AuthorDate: Thu Mar 28 20:38:36 2024 +0200 Add link to validate_pulsar_release.sh script --- contribute/validate-release-candidate.md | 7 +++ 1 file changed, 7 insertions(+) diff --git a/contribute/validate-release-candidate.md b/contribute/validate-release-candidate.md index 38d497999b5c..19e8b7b993c8 100644 --- a/contribute/validate-release-candidate.md +++ b/contribute/validate-release-candidate.md @@ -7,6 +7,13 @@ This page contains manual instructions for reviewing and verifying a release can ## Validate the binary distribution +:::note + +There's a bash script [validate_pulsar_release.sh](https://github.com/lhotari/pulsar-contributor-toolbox/blob/master/scripts/validate_pulsar_release.sh) available for assisting in semi-automated validation for the following steps. + +::: + + ### Download And Verify the binary distributions Download the server distribution `apache-pulsar--bin.tar.gz` and extract it. The extracted files are in a directory called `apache-pulsar-`. All the operations below happen within that directory:
(pulsar-site) branch main updated (a1f23abf632b -> b8ce7b8574e5)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git from a1f23abf632b Update release process with use of variables to ease copy-pasting new 71e9dc729904 Improve release process docs new b8ce7b8574e5 Add link to validate_pulsar_release.sh script The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: contribute/release-process.md| 109 +-- contribute/validate-release-candidate.md | 7 ++ 2 files changed, 95 insertions(+), 21 deletions(-)
(pulsar-site) 01/02: Improve release process docs
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git commit 71e9dc729904cc102a5a6588a1cc565a76b679d1 Author: Lari Hotari AuthorDate: Thu Mar 28 17:10:49 2024 +0200 Improve release process docs --- contribute/release-process.md | 109 ++ 1 file changed, 88 insertions(+), 21 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index cff4d7704c6f..54f3911017db 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -63,6 +63,16 @@ If you run into problems with GPG signing set this export GPG_TTY=$(tty) ``` +For some commands, a template is copied to the clipboard using `pbcopy`. +This is already available on MacOS. For Linux, create a shell alias: + +```shell +# Linux only +# install xsel if it is missing +sudo apt install xsel +# create alias pbcopy for copying stdin to clipboard +alias pbcopy="xsel --clipboard --input" +``` ## Create a release candidate ### Create the release branch @@ -89,7 +99,7 @@ Alternatively, you can use a git workspace to create a new, clean directory on y ```shell git worktree add ../pulsar-release-$VERSION_BRANCH $VERSION_BRANCH -cd pulsar-release-$VERSION_BRANCH +cd ../pulsar-release-$VERSION_BRANCH export PULSAR_PATH=$(pwd) ``` @@ -98,6 +108,11 @@ if you get an error that the branch is already checked out, go to that directory git checkout --detach HEAD ``` +After the release, you can cleanup the worktree in the main repository directory +```shell +git worktree remove ../pulsar-release-$VERSION_BRANCH +``` + If you created a new branch, update the [CI - OWASP Dependency Check](https://github.com/apache/pulsar/blob/master/.github/workflows/ci-owasp-dependency-check.yaml) workflow so that it will run on the new branch. Note that you should also stop the workflow for previous Pulsar versions that are EOL. @@ -185,6 +200,16 @@ git push $UPSTREAM_REMOTE $VERSION_BRANCH git push $UPSTREAM_REMOTE v$VERSION_RC ``` +If there's a need to restart the release with more commits, you can delete the tag. + +```shell +# only if you restart the release before it has been published for voting. Don't run this after that! +# delete local tag +git tag -d v$VERSION_RC +# delete tag in remote +git push $UPSTREAM_REMOTE :v$VERSION_RC +``` + For patch releases, the tag is like `2.3.1`. ### Build release artifacts @@ -253,7 +278,6 @@ KEY_ID=$(gpg --list-keys --with-colons $apache_u...@apache.org | egrep "^pub" | echo "default-key $KEY_ID" >> ~/.gnupg/gpg.conf ``` - ```shell # make sure to run svn mkdir commmand in a different dir(NOT IN $PULSAR_PATH). mkdir ~/pulsar-svn-release-$VERSION_RC @@ -308,7 +332,17 @@ Log in to the ASF Nexus repository at https://repository.apache.org Click on "Staging Repositories" on the left sidebar and then select the current Pulsar staging repo. This should be called something like `orgapachepulsar-XYZ`. -Use the "Close" button to close the repository. This operation will take few minutes. Once complete click "Refresh" and now a link to the staging repository should be available, something like https://repository.apache.org/content/repositories/orgapachepulsar-XYZ +Add a version string such as "Apache Pulsar 3.0.4-candidate-1" to the clipboard with this command: + +```shell +printf "Apache Pulsar $VERSION_RC" |pbcopy +``` + +Use the "Close" button to close the repository. + +Enter the version string in the description field before clicking "Confirm". + +This operation will take few minutes. Once complete click "Refresh" and now a link to the staging repository should be available, something like https://repository.apache.org/content/repositories/orgapachepulsar-XYZ ### Stage Docker images @@ -337,31 +371,57 @@ DOCKER_USER= DOCKER_PASSWORD= DOCKER_ORG= -DOCKER_ORG= -docker login $DOCKER_ORG -u $DOCKER_USER -p +DOCKER_USER= +DOCKER_ORG= +docker login -u $DOCKER_USER mvn install -DUBUNTU_MIRROR=http://azure.archive.ubuntu.com/ubuntu/ \ -DskipTests \ +-Dmaven.gitcommitid.nativegit=true \ -Pmain,docker -Pdocker-push \ -Ddocker.platforms=linux/amd64,linux/arm64 \ -Ddocker.organization=$DOCKER_ORG \ -pl docker/pulsar,docker/pulsar-all ``` - ## Vote for the release candidate -Start a voting thread on the dev mailing list. Here is a sample content: +Start a voting thread on the dev mailing list. + +Here is a way to render the template for the voting email. + +Set these shell variables +```shell +DOCKER_USER= +STAGING_REPO="https://repository.apache.org/#stagingRepositories>" +MY_NAME="Firstname Lastname" +PREVIOUS_VERSION_WITHOUT_RC="3.0.3" +``` + +```shell +echo "Go to https://hub.docker.com/r/$DOCKER_USER/pulsar/tags to find the layer URL for the pulsar image" +echo "Go to https://hub.docker.com/r/$DOCKER_USER/pulsar-all/tags to find the layer URL
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
poorbarcode commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543416514 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception { Review Comment: Oh, it is a counter to record replay readings. It increases when the reading is not `Normal`. ``` if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { replyReadCounter.incrementAndGet(); } ``` ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception { Review Comment: Oh, it is a counter to record replay readings. It increases when the reading is not `Normal`. ```java if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { replyReadCounter.incrementAndGet(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
poorbarcode commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1543416791 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception { +PersistentTopic persistentTopic = +(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); +ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); +ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor(cursorName); +managedLedger.getCursors().removeCursor(cursor.getName()); +managedLedger.getActiveCursors().removeCursor(cursor.getName()); +ManagedCursorImpl spyCursor = Mockito.spy(cursor); +managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST); +managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST); +AtomicInteger replyReadCounter = new AtomicInteger(); +Mockito.doAnswer(invocation -> { +if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { +replyReadCounter.incrementAndGet(); +} +return invocation.callRealMethod(); +}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any()); +Mockito.doAnswer(invocation -> { +if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { +replyReadCounter.incrementAndGet(); +} +return invocation.callRealMethod(); +}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); +admin.topics().createSubscription(topicName, cursorName, MessageId.earliest); +return replyReadCounter; +} + +@Test +public void testNoRepeatedReadAndDiscard() throws Exception { +int delayedMessages = 100; +final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); +final String subName = "my-sub"; +admin.topics().createNonPartitionedTopic(topic); +AtomicInteger replyReadCounter = injectReplayReadCounter(topic, subName); + +// Send messages. +@Cleanup +Producer producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create(); +for (int i = 0; i < delayedMessages; i++) { +MessageId messageId = producer.newMessage() +.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) +.value(100 + i) +.send(); +log.info("Published delayed message :{}", messageId); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
svn commit: r68189 [1/2] - in /dev/pulsar/pulsar-3.0.4-candidate-1: ./ connectors/
Author: lhotari Date: Thu Mar 28 17:04:01 2024 New Revision: 68189 Log: Staging artifacts and signature for Pulsar release 3.0.4-candidate-1 Added: dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-bin.tar.gz (with props) dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-bin.tar.gz.asc dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-bin.tar.gz.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-src.tar.gz (with props) dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-src.tar.gz.asc dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-3.0.4-src.tar.gz.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-offloaders-3.0.4-bin.tar.gz (with props) dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-offloaders-3.0.4-bin.tar.gz.asc dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-offloaders-3.0.4-bin.tar.gz.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.tar.gz (with props) dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.tar.gz.asc dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.tar.gz.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.zip (with props) dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.zip.asc dev/pulsar/pulsar-3.0.4-candidate-1/apache-pulsar-shell-3.0.4-bin.zip.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/ dev/pulsar/pulsar-3.0.4-candidate-1/connectors/LICENSE dev/pulsar/pulsar-3.0.4-candidate-1/connectors/README dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-aerospike-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-aerospike-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-aerospike-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-alluxio-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-alluxio-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-alluxio-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-batch-data-generator-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-batch-data-generator-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-batch-data-generator-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-canal-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-canal-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-canal-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-cassandra-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-cassandra-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-cassandra-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-data-generator-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-data-generator-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-data-generator-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mongodb-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mongodb-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mongodb-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mssql-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mssql-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mssql-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mysql-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mysql-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-mysql-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-oracle-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-oracle-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-oracle-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-postgres-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-postgres-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-debezium-postgres-3.0.4.nar.sha512 dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-dynamodb-3.0.4.nar (with props) dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-dynamodb-3.0.4.nar.asc dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-dynamodb-3.0.4.nar.sha512
svn commit: r68189 [2/2] - in /dev/pulsar/pulsar-3.0.4-candidate-1: ./ connectors/
Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.asc == --- dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.asc (added) +++ dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.asc Thu Mar 28 17:04:01 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFoOgACgkQ0/pn1SLF +UlaohA/9HgONgfp6E7PQqo3tCdjK2WEkiMlDKrdOlxNiCJDyG1SBCBMjlqoAVmyd +zgfQlfRI6l1QvipaONboUd1osWU4zVWCS7gBA+SkoITPS9IBTAHKGi6meRdiGQ6v +2U8yJJ2bdY2UY01Bc+FtHfoIR1NZG3U17GLOMxibVMj0ALd0HoSVV+cCDqu1LmDT +rUvIZ+beFZZO2lq/WuWcp+SZmLfY+oiXpQTmTFUX/FJOWmUXKBli1OTPAea/94Jz +C59DI982jEo+q/z6EC6avJqCjnf4rb4vHOmmtipRre9VJZWMi7aLFMCbS3bE3odX +iqleOUAtcChwgfB13N6CVePFWYdTzQZ+F6fCSfz9GLB7fFWNCG4Y7sLZrCOKSndG +yBpw/ZEe43F6zJEEc+1Vs6CHGV3IVjy0EtxUOcTYv+GQqWQ5LK3wZh8eyeBxvAIJ +v8tvrkDSfHR/D3NFwSwjfBakYPwUv0tXCh1kVcvE/1eNfQeTAHyDvR5zkuavygre +t4EQvWMZNxxdaLim4FVzIMTzvb8ioGwIF4twhoXjaIm1Y6TIJEvoaQIvFStfrqWY +TvntAMP1jBDPGQXMWUjzYSrw4RRu1gpDgGZjEh8fAV3AGzwEbHpDWjAKZCAq/bwJ +IQMRKBzjNaFEvpwgWFPvZ21bNSlab+q25M8xfjsFoq9XCknF/WQ= +=llw4 +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.sha512 == --- dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.sha512 (added) +++ dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-jdbc-sqlite-3.0.4.nar.sha512 Thu Mar 28 17:04:01 2024 @@ -0,0 +1 @@ +67048ff6031ddd5086116aa4341663270e9e07ffb89baa38dfcf9342681188ee8d94d5f593b3f5b3c33b33a972d254f99efd6539729ef392fe3964d7f26888b6 ./connectors/pulsar-io-jdbc-sqlite-3.0.4.nar Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.asc == --- dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.asc (added) +++ dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.asc Thu Mar 28 17:04:01 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFoOMACgkQ0/pn1SLF +UlawYw//bhR2kbZ6ozMsRYmwA7PWHKANy594GhEMlt7QVJTKGAQ1J7bohsnr2xR1 +2ZCPbuJ6AY4FAVS5fFdlRELX0XkVAKubsRuazSilcGPKTlXBZ+GxoZCCSbDXjAdx ++XhQLW6Pk3tqa81iO5ERJv8WPa/OBdeN76qDTJ0I5P4wtj/xOi6vXZITLFkYO3qO +08b/dR3itBfQ9oozeCAuEzt6rxCu1MZGyAb3/u7m7dk1ErJ7Jf7MIP2l94hNN1q6 +eyft+hjVpyNmoLGtqzzD9ThwnKIg8k3VSh59UWOCLvde/VRGxALd7qH2EzwdLcYo +emA/YLCm1VilpBCdooznvDWue7mJxn/0Y4h5Fxkdv+O16pSB/CnFGrcDCmM8V88V +InsEXLgmtdzCW+ystT5SxVYNJ6jkpBuGtmVy0G5Y0vhrGoykFNfvF/zaz4aEQTYD +cokTxBcPPh8Ec6vU0iHCzcYyX7cEr6YDDDNo86yqs34iPdhOSNiHngZRMxo25iaB +BOAb00wN4t38AowgETd2x4uTf6AHrqCmwLSzQUALx0TJSa1lwrHSAa+hu0YTrnEb +Kreotjvzpv+2Yw7RhpX4joy1gBVu7qJgWMl9FKqmkvvignALSk4fNPb1cMkAueeO +p45CjFd86vLSUA1HuqkiyDAbmVcshIy/2WNm98nPiRd/F9Oq9Ao= +=ysvU +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.sha512 == --- dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.sha512 (added) +++ dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-3.0.4.nar.sha512 Thu Mar 28 17:04:01 2024 @@ -0,0 +1 @@ +786fa9c075325397fb6d0888554a00e13c1a2be913fd57b50ddf9d5f7c26dde8016dd57f2ce86ef66a7cd662558f5e35d565538b39b2fd4a9ab893fc3e162b5a ./connectors/pulsar-io-kafka-3.0.4.nar Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.0.4.nar == Binary file - no diff available. Propchange: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.0.4.nar -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-3.0.4-candidate-1/connectors/pulsar-io-kafka-connect-adaptor-3.0.4.nar.asc
Re: [PR] [improve] PIP-342: OTel client metrics support [pulsar]
merlimat merged PR #22179: URL: https://github.com/apache/pulsar/pull/22179 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-2.10 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.10 by this push: new c44cacde66e [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) c44cacde66e is described below commit c44cacde66e22b9e7a8c539267b0aeca6d6d426e Author: Cong Zhao AuthorDate: Thu Mar 28 23:14:19 2024 +0800 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60) # Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java --- .../apache/pulsar/broker/service/ServerCnx.java| 5 ++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 35 +- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 08d84f3e4fa..aafcb5eeb48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2126,9 +2126,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } +final String topic = commandGetSchema.getTopic(); String schemaName; try { -schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); +schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2137,7 +2138,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, -String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); +String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index c7cbc9b92c5..e6def654fee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -44,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; @@ -73,6 +74,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; @@ -101,6 +104,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { +isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -109,6 +113,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); +admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -117,6 +122,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { super.internalCleanup(); } +@Test +public void testGetSchemaWithPatternTopic() throws Exception { +final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + +int topicNums = 10; +for (int i = 0; i < topicNums; i++) { +String topic = topicPrefix + "-" +
(pulsar) branch branch-2.11 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-2.11 by this push: new ac22f29ccbe [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) ac22f29ccbe is described below commit ac22f29ccbeb26c9d481ee578d8bf3b232c33bc1 Author: Cong Zhao AuthorDate: Thu Mar 28 23:14:19 2024 +0800 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60) # Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java --- .../apache/pulsar/broker/service/ServerCnx.java| 5 ++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 35 +- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1f7662b4cff..d6b59e0b596 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2210,9 +2210,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } +final String topic = commandGetSchema.getTopic(); String schemaName; try { -schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); +schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2221,7 +,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, -String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); +String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d9254718cb6..648bee38549 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.EqualsAndHashCode; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; @@ -97,6 +100,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { +isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -105,6 +109,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); +admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -113,6 +118,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { super.internalCleanup(); } +@Test +public void testGetSchemaWithPatternTopic() throws Exception { +final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + +int topicNums = 10; +for (int i = 0; i < topicNums; i++) { +String topic = topicPrefix + "-" + i; +
(pulsar) branch branch-3.1 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 585fc54f339 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) 585fc54f339 is described below commit 585fc54f339b78ce1a5edf19a7093355284da0a4 Author: Cong Zhao AuthorDate: Thu Mar 28 23:14:19 2024 +0800 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60) --- .../apache/pulsar/broker/service/ServerCnx.java| 5 ++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 33 ++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 5057b7b045a..29ba7cb866e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2389,9 +2389,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } +final String topic = commandGetSchema.getTopic(); String schemaName; try { -schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); +schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2400,7 +2401,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, -String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); +String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 7eae6462545..e10b45868bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.EqualsAndHashCode; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { +isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); +admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -130,6 +135,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create(); } +@Test +public void testGetSchemaWithPatternTopic() throws Exception { +final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + +int topicNums = 10; +for (int i = 0; i < topicNums; i++) { +String topic = topicPrefix + "-" + i; +admin.topics().createNonPartitionedTopic(topic);
Re: [I] Flaky-test: ShadowManagedLedgerImplTest.testShadowWrites [pulsar]
Technoboy- commented on issue #22345: URL: https://github.com/apache/pulsar/issues/22345#issuecomment-2025622773 The only way seems to increase the timeout for the Awaitility -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][ci] Add Trivy container scan Github workflow [pulsar]
merlimat commented on code in PR #22063: URL: https://github.com/apache/pulsar/pull/22063#discussion_r1543253362 ## .github/workflows/ci-trivy-container-scan.yaml: ## @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: CI - Trivy Container Scan +on: + schedule: +- cron: '0 8 * * *' # Every day at 8am UTC + workflow_dispatch: +inputs: + severity: +description: "Severities to include (comma-separated or 'ALL' to include all)" +required: false +default: 'CRITICAL,HIGH' + +jobs: + container_scan: +if: ${{ github.repository == 'apache/pulsar' }} +name: Trivy Docker image vulnerability scan +runs-on: ubuntu-latest +strategy: + fail-fast: false + matrix: +docker-image: + - 'apachepulsar/pulsar' + - 'apachepulsar/pulsar-all' +docker-tag: + - '3.2.1' Review Comment: Maybe we should have `latest` here. Unfortunately, we don't have a `nightly` tag to test the image from master branch. ## .github/workflows/ci-trivy-container-scan.yaml: ## @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: CI - Trivy Container Scan +on: + schedule: +- cron: '0 8 * * *' # Every day at 8am UTC + workflow_dispatch: +inputs: + severity: +description: "Severities to include (comma-separated or 'ALL' to include all)" +required: false +default: 'CRITICAL,HIGH' + +jobs: + container_scan: +if: ${{ github.repository == 'apache/pulsar' }} +name: Trivy Docker image vulnerability scan +runs-on: ubuntu-latest +strategy: + fail-fast: false + matrix: +docker-image: + - 'apachepulsar/pulsar' + - 'apachepulsar/pulsar-all' Review Comment: `pulsar-all` should get dropped in 3.3, so we can already drop it here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][ci] Add Trivy container scan Github workflow [pulsar]
onobc commented on PR #22063: URL: https://github.com/apache/pulsar/pull/22063#issuecomment-2025594055 The workflow has been simplified - PTAL @merlimat @lhotari -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [feat][broker][branch-2.11] Implementation of PIP-323: Complete Backlog Quota Telemetry [pulsar]
liangyepianzhou merged PR #22374: URL: https://github.com/apache/pulsar/pull/22374 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Replication/topic level [pulsar]
liangyepianzhou opened a new pull request, #22378: URL: https://github.com/apache/pulsar/pull/22378 PIP: https://github.com/apache/pulsar/pull/21648 ### Motivation Because the configurations of clusters at the namespace level are unclear, some flexible topic policies can not work as expected, e.g. geo-replication at the topic level. ### Modifications Use `replication-clusters`, `allowed-clusters` to replace a single `replication-clusters` originally in the namespace policy. ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [x] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) annotated tag v3.0.4-candidate-1 deleted (was fac66a82c4f)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to annotated tag v3.0.4-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar.git *** WARNING: tag v3.0.4-candidate-1 was deleted! *** tag was fac66a82c4f The revisions that were on this annotated tag are still contained in other references; therefore, this change does not discard any commits from the repository.
(pulsar) annotated tag v3.0.4-candidate-1 updated (5a1fa0c5c67 -> 9dc160153eb)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to annotated tag v3.0.4-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar.git *** WARNING: tag v3.0.4-candidate-1 was modified! *** from 5a1fa0c5c67 (commit) to 9dc160153eb (tag) tagging 5a1fa0c5c6709bdb7b003ce12d00abf52da293e1 (commit) replaces v3.0.3 by Lari Hotari on Thu Mar 28 17:28:22 2024 +0200 - Log - Release 3.0.4-candidate-1 -BEGIN PGP SIGNATURE- iQJHBAABCgAxFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFjJYTHGxob3RhcmlA YXBhY2hlLm9yZwAKCRDT+mfVIsVSVnf4D/0UHn5R0thMnQEcaRzf35ruuFetf3ZK bfFaJJ8qCOrdWpsZ8y6woLMBrolRXohWNylPWzu+bYfLIUm3bAnCAVlFEhHI8kHG ArKRbb9MhSrHuw1eKi7Qgr8/4jiEeq4lzMMJ4a69PBPcBdGE82LUih2PSRNutZrC cr4kVGlbdEfYcWghnG8j+g5KcYqL4F7U1MmgefE46VoUqGpSceWzRHpC8a/LbLKN BgyKufCnRLSxZ9fYHDUbS/DsI7mLs31uvgOnrj6HtWNoxNilV3RRiW9rTZ5Wux// qr+YKOURn8LRZ3D+/p9Of+fFKriXqjZDUGTREkksOlmHTPrG9Cq+a0/cHsO7Kp5N JvdBD3w8XPjHd3b/38ihvLJrL/PkFDmKn/lmRIv1aqg2sQZeRNL7usPLDCkRiq2f cXpIHQ/ND6OCtBVHoLvlppEgVtWc3VwqeLOKMm3dtQsV20bJEoV+xip1iaah+MDY ToQLOKrYp35NloQEPbEXix+CzSKMl8u4PU/XvMRuqfmAicvSrdV7Iao9ptkq/I/4 o9t2LPAzfNBeEuQxHdLKt6F6dguCES93TPe4JT0WB0RKAFUL/cEsthElShDgLYXi gGL0XzBnyydJHearsO5Z8gHpLeY2SE3/l9IGel+WAdRCAsLbXzy0O2tu6ipp6Rak Qq3K+V/wv4VfNw== =W/au -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar) branch branch-3.2 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 4847648e78d [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) 4847648e78d is described below commit 4847648e78dbf2deb8c7a6cee54e37ad6a6323e7 Author: Cong Zhao AuthorDate: Thu Mar 28 23:14:19 2024 +0800 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60) --- .../apache/pulsar/broker/service/ServerCnx.java| 5 ++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 33 ++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 78c1c39cfe5..569c5a2fb1e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2496,9 +2496,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } +final String topic = commandGetSchema.getTopic(); String schemaName; try { -schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); +schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2507,7 +2508,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, -String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); +String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d4ef041f6de..aa47c378fc3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.EqualsAndHashCode; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { +isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); +admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -130,6 +135,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create(); } +@Test +public void testGetSchemaWithPatternTopic() throws Exception { +final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + +int topicNums = 10; +for (int i = 0; i < topicNums; i++) { +String topic = topicPrefix + "-" + i; +admin.topics().createNonPartitionedTopic(topic);
(pulsar) branch branch-3.0 updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5a1fa0c5c67 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) 5a1fa0c5c67 is described below commit 5a1fa0c5c6709bdb7b003ce12d00abf52da293e1 Author: Cong Zhao AuthorDate: Thu Mar 28 23:14:19 2024 +0800 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) (cherry picked from commit d8903da3d5ea5bab207d119186f2be6fa1147f60) --- .../apache/pulsar/broker/service/ServerCnx.java| 5 ++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 33 ++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 30432aaef73..2b2cbc5fac2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2389,9 +2389,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } +final String topic = commandGetSchema.getTopic(); String schemaName; try { -schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); +schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2400,7 +2401,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, -String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); +String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index 7eae6462545..e10b45868bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.EqualsAndHashCode; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { +isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); +admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -130,6 +135,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create(); } +@Test +public void testGetSchemaWithPatternTopic() throws Exception { +final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + +int topicNums = 10; +for (int i = 0; i < topicNums; i++) { +String topic = topicPrefix + "-" + i; +admin.topics().createNonPartitionedTopic(topic);
Re: [PR] [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request [pulsar]
lhotari merged PR #22377: URL: https://github.com/apache/pulsar/pull/22377 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch master updated: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d8903da3d5e [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) d8903da3d5e is described below commit d8903da3d5ea5bab207d119186f2be6fa1147f60 Author: Cong Zhao AuthorDate: Thu Mar 28 23:14:19 2024 +0800 [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request (#22377) --- .../apache/pulsar/broker/service/ServerCnx.java| 5 ++-- .../java/org/apache/pulsar/schema/SchemaTest.java | 33 ++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4f82f416ed2..716f3a1a04c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2504,9 +2504,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaVersion = schemaService.versionFromBytes(commandGetSchema.getSchemaVersion()); } +final String topic = commandGetSchema.getTopic(); String schemaName; try { -schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName(); +schemaName = TopicName.get(topic).getSchemaName(); } catch (Throwable t) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage()); return; @@ -2515,7 +2516,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> { if (schemaAndMetadata == null) { commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, -String.format("Topic not found or no-schema %s", commandGetSchema.getTopic())); +String.format("Topic not found or no-schema %s", topic)); } else { commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index d4ef041f6de..aa47c378fc3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; import java.util.stream.Collectors; import lombok.Cleanup; import lombok.EqualsAndHashCode; @@ -69,6 +70,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl; import org.apache.pulsar.client.impl.schema.ProtobufSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -98,6 +101,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { +isTcpLookup = true; super.internalSetup(); // Setup namespaces @@ -106,6 +110,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { .allowedClusters(Collections.singleton(CLUSTER_NAME)) .build(); admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); +admin.namespaces().createNamespace(PUBLIC_TENANT + "/my-ns"); } @AfterMethod(alwaysRun = true) @@ -130,6 +135,34 @@ public class SchemaTest extends MockedPulsarServiceBaseTest { pulsarClient.newProducer(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES()).topic(topic).create(); } +@Test +public void testGetSchemaWithPatternTopic() throws Exception { +final String topicPrefix = "persistent://public/my-ns/test-getSchema"; + +int topicNums = 10; +for (int i = 0; i < topicNums; i++) { +String topic = topicPrefix + "-" + i; +admin.topics().createNonPartitionedTopic(topic); +} + +Pattern pattern = Pattern.compile(topicPrefix + "-.*"); +
Re: [PR] [improve][misc] Upgrade log4j2 to 2.23.1 [pulsar]
nodece commented on PR #22327: URL: https://github.com/apache/pulsar/pull/22327#issuecomment-2025412874 I tried to use the following config to generate the log4j2 plugin config, but not working. ```xml org.apache.maven.plugins maven-shade-plugin ${maven-shade-plugin} org.apache.logging.log4j log4j-transform-maven-shade-plugin-extensions 0.1.0 shade-jar-with-dependencies package shade true ``` Or ``` org.apache.maven.plugins maven-compiler-plugin UTF-8 true true false org.projectlombok lombok ${lombok.version} org.apache.logging.log4j log4j-core ${log4j2.version} -parameters -Xlint:all -Xlint:-options -Xlint:-serial -Xlint:-classfile -Xlint:-processing -Xpkginfo:always ``` If you have a good idea, please let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][io] use create for bulk op if target cluster is ElasticSearch [pulsar]
shibd commented on code in PR #21377: URL: https://github.com/apache/pulsar/pull/21377#discussion_r1543059139 ## pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java: ## @@ -340,6 +348,11 @@ public enum IdHashingAlgorithm { SHA512 } +public enum IndexType { +INDEX, +DATA_STREAM Review Comment: Where is it used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][broker] Improve Gzip compression, allow excluding specific paths or disabling it [pulsar]
asafm commented on code in PR #22370: URL: https://github.com/apache/pulsar/pull/22370#discussion_r1542921618 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java: ## @@ -294,8 +291,10 @@ public void start() throws PulsarServerException { ContextHandlerCollection contexts = new ContextHandlerCollection(); contexts.setHandlers(handlers.toArray(new Handler[handlers.size()])); +Handler handlerForContexts = GzipHandlerUtil.wrapWithGzipHandler(contexts, Review Comment: Honestly, I find the entire handlers registration in `start()` super confusing. Maybe we can just do it in a simpler manner first like: ``` import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.DefaultHandler; import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; public class JettyServerConfiguration { private static final Logger LOG = Log.getLogger(JettyServerConfiguration.class); public static void main(String[] args) { Server server = new Server(8080); // Example port // Request Logging RequestLogHandler requestLogHandler = new RequestLogHandler(); requestLogHandler.setRequestLog(JettyRequestLogFactory.createRequestLogger()); // Metrics StatisticsHandler stats = new StatisticsHandler(); // Handler Ordering HandlerCollection handlers = new HandlerCollection(); handlers.setHandlers(new Handler[] { requestLogHandler, stats, new DefaultHandler() }); // Metrics Registration (with handling) try { JettyStatisticsCollector jettyStatisticsCollector = new JettyStatisticsCollector(stats); jettyStatisticsCollector.register(); } catch (IllegalArgumentException e) { LOG.warn("JettyStatisticsCollector likely already registered", e); } server.setHandler(handlers); try { server.start(); server.join(); } catch (Exception e) { LOG.error("Failed to start server", e); } } } ``` then maybe we have the handler array be: stats, gzip, log, default ## pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java: ## @@ -327,4 +327,11 @@ PulsarAdminBuilder authentication(String authPluginClassName, Maphttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import java.util.List; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; + +public class GzipHandlerUtil { +public static Handler wrapWithGzipHandler(Handler innerHandler, List gzipCompressionExcludedPaths) { +Handler wrappedHandler; +if (isGzipCompressionCompletelyDisabled(gzipCompressionExcludedPaths)) { Review Comment: Just thinking out loud - can't we just define gzip on the handlers we see fit? We know Pulsar best no? Too much configuration option can be confusing. They can control the gzip by using headers, no? ## pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java: ## @@ -54,13 +55,23 @@ private PulsarAdminBuilderImpl(ClientConfigurationData conf) { @Override public PulsarAdminBuilder clone() { -return new PulsarAdminBuilderImpl(conf.clone()); +PulsarAdminBuilderImpl pulsarAdminBuilder = new PulsarAdminBuilderImpl(conf.clone()); +pulsarAdminBuilder.clientBuilderClassLoader = clientBuilderClassLoader; +pulsarAdminBuilder.gzipCompressionEnabled = gzipCompressionEnabled; +return pulsarAdminBuilder; } @Override public PulsarAdminBuilder loadConf(Map config) { conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class); setAuthenticationFromPropsIfAvailable(conf); +if (config.containsKey("gzipCompressionEnabled")) { +if (config.get("gzipCompressionEnabled") instanceof Boolean) { Review Comment: I would personally extract it once `config.get("gzipCompressionEnabled")` -- This is an automated message from the Apache Git Service. To respond to the message, please log on
(pulsar-site) branch main updated: Update release process with use of variables to ease copy-pasting
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new a1f23abf632b Update release process with use of variables to ease copy-pasting a1f23abf632b is described below commit a1f23abf632b911a5c549d7185f6dcc1a1e92dc8 Author: Lari Hotari AuthorDate: Thu Mar 28 14:48:29 2024 +0200 Update release process with use of variables to ease copy-pasting --- contribute/create-gpg-keys.md | 10 contribute/release-process.md | 112 ++ 2 files changed, 80 insertions(+), 42 deletions(-) diff --git a/contribute/create-gpg-keys.md b/contribute/create-gpg-keys.md index 03e514e6942a..377139afec80 100644 --- a/contribute/create-gpg-keys.md +++ b/contribute/create-gpg-keys.md @@ -118,6 +118,16 @@ gpg --send-key --keyserver=keys.openpgp.org $KEY_ID gpg --send-key --keyserver=keyserver.ubuntu.com $KEY_ID ``` +## Make your the Apache key the default key for GPG + +This is required for signing the release artifacts + +```shell +APACHEID=your_asf_id +KEY_ID=$(gpg --list-keys --with-colons $apach...@apache.org | egrep "^pub" | awk -F: '{print $5}') +echo "default-key $KEY_ID" >> ~/.gnupg/gpg.conf +``` + ## Appending the key to KEYS files The GPG key needs to be appended to `KEYS` file for the release candidates. diff --git a/contribute/release-process.md b/contribute/release-process.md index 4780e77a664d..cff4d7704c6f 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -40,6 +40,29 @@ Before you start the next release steps, make sure you have installed these soft Also, you need to **clean up the bookkeeper's local compiled** to make sure the bookkeeper dependency is fetched from the Maven repository, details to see [this mailing list thread](https://lists.apache.org/thread/gsbh95b2d9xtcg5fmtxpm9k9q6w68gd2). + +## Set environment variables to be used across the commands + +Set version +```shell +export VERSION_RC=3.0.4-candidate-1 +export VERSION_WITHOUT_RC=${VERSION_RC%-candidate-*} +export VERSION_BRANCH=branch-3.0 +export UPSTREAM_REMOTE=origin +``` + +Set your ASF user id +```shell +export APACHE_USER= +``` + +In addition, you will need to set `PULSAR_PATH` to point to the cleanly checked out working directory for the release branch. + +If you run into problems with GPG signing set this +``` +export GPG_TTY=$(tty) +``` + ## Create a release candidate ### Create the release branch @@ -58,14 +81,21 @@ It is recommended to create a fresh clone of the repository to avoid any local f ```shell git clone g...@github.com:apache/pulsar.git cd pulsar -PULSAR_PATH=$(pwd) -git checkout -b branch-2.X origin/master +export PULSAR_PATH=$(pwd) +git checkout -b $VERSION_BRANCH origin/master ``` Alternatively, you can use a git workspace to create a new, clean directory on your machine without needing to re-download the project. ```shell -git worktree add ../pulsar.branch-2.X branch-2.X +git worktree add ../pulsar-release-$VERSION_BRANCH $VERSION_BRANCH +cd pulsar-release-$VERSION_BRANCH +export PULSAR_PATH=$(pwd) +``` + +if you get an error that the branch is already checked out, go to that directory detach it from the branch. After this the above command should succeed +```shell +git checkout --detach HEAD ``` If you created a new branch, update the [CI - OWASP Dependency Check](https://github.com/apache/pulsar/blob/master/.github/workflows/ci-owasp-dependency-check.yaml) workflow so that it will run on the new branch. @@ -126,6 +156,8 @@ It will speed up cherry-picking since you commit ids are there and there's also A cherry-pick should be done in this order with `git cherry-pick -x COMMIT_ID`. It's possible that some dependent commits are necessary to be cherry-picked when you encounter a lot of merge conflicts in a case where they aren't expected. + + ### Update project version and tag During the release process, you are going to initially create "candidate" tags, that after verification and approval will get promoted to the "real" final tag. @@ -135,26 +167,22 @@ In this process, the maven version of the project will always be the final one. ```shell # Bump to the release version cd $PULSAR_PATH -./src/set-project-version.sh 2.X.0 +./src/set-project-version.sh $VERSION_WITHOUT_RC # Some version may not update the right parent version of `protobuf-shaded/pom.xml`, please double check it. # Commit -git commit -m 'Release 2.X.0' -a +git commit -m "Release $VERSION_WITHOUT_RC" -a # Create a "candidate" tag -# If you don't sign your commits already, use the following -export GPG_TTY=$(tty) -git tag -u $u...@apache.org v2.X.0-candidate-1 -m 'Release v2.X.0-candidate-1' -# If you already sign your commits using your apache.org email, use the following -git tag -s v2.X.0-candidate-1 -m 'Release
svn commit: r68185 - /dev/pulsar/pulsar-3.0.4-candidate-1/
Author: lhotari Date: Thu Mar 28 13:06:31 2024 New Revision: 68185 Log: Add directory for pulsar 3.0.4-candidate-1 release Added: dev/pulsar/pulsar-3.0.4-candidate-1/
(pulsar) annotated tag v3.0.4-candidate-1 updated (2e13fba3ee6 -> fac66a82c4f)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to annotated tag v3.0.4-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar.git *** WARNING: tag v3.0.4-candidate-1 was modified! *** from 2e13fba3ee6 (commit) to fac66a82c4f (tag) tagging 2e13fba3ee6b4eba5de4d36b0f00200184bd1145 (commit) replaces v3.0.3 by Lari Hotari on Thu Mar 28 14:56:47 2024 +0200 - Log - Release 3.0.4-candidate-1 -BEGIN PGP SIGNATURE- iQJHBAABCgAxFiEExmPS9k2iygnbKNmr0/pn1SLFUlYFAmYFaQ8THGxob3RhcmlA YXBhY2hlLm9yZwAKCRDT+mfVIsVSVgsxD/0Y9YorBhJ2DGhZlx9+ywRA/nBkfI5b IXg9PqaqQbGfCuvjl1/JBUuKahUH29R9y06ZPImNMLDOZRYprYqSCnelGUyX+MOu B+sDdsKUO1fY4Fz+DBzJfEhskk3YhStx4zHwTcm6epz7RlI5m1iNGv3TsJ8RbQus NgmtlQVjH1xicJLLl9c7ukUx8NiZXysAMggAELvsGunNMvBXtqyAdtr7BqzGB9tT p2/cVhAA1P4aizmWJ7FKrTa1H3mtEsYhtcovqTngXAjPbexjp2preogsQpRAN0Xl 0/9Yrr/4xWMcfSKNCACG56XQB5ZBvAXw86Nf5DavdXrI678cBGiX+lU7rlFgMLQk hlyvJR4MB/+yzYF/EF1l+jN0trZTZQS4pbld9+rKuiD8SDZF3kjA94f6qgAxtFnO XY+wjgYxUPYkZHBaAKpKvS9g5uFWltkitleVdI7+hcZT6CCWew4JyZnCMANaR4at vckzG7KVJOvm617Vd+3YjHftN8L8fNP+xkBMdb0wC/YXqmUcGCCROJRRPUhK0zhD /MmmMrVdHuwDkOis0bhe+p8rK3SHX8jsCLmkv0u3OcLxfp9xbS62hXNlYUjblza2 0oN3AHAlMFHQotgAZO40q+An1pP9X5fCpKTpeIMPxm5YbpPWqKZDzC1u/6pdHskC XGz86Ucjq2NfmA== =D0Ou -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar) branch branch-3.0 updated (8f174463550 -> 2e13fba3ee6)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 8f174463550 [improve][test][branch-3.0] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage add 2e13fba3ee6 Release 3.0.4 No new revisions were added by this update. Summary of changes: bouncy-castle/bc/pom.xml | 2 +- bouncy-castle/bcfips-include-test/pom.xml | 2 +- bouncy-castle/bcfips/pom.xml | 2 +- bouncy-castle/pom.xml | 2 +- buildtools/pom.xml| 4 ++-- distribution/io/pom.xml | 2 +- distribution/offloaders/pom.xml | 2 +- distribution/pom.xml | 2 +- distribution/server/pom.xml | 2 +- distribution/shell/pom.xml| 2 +- docker/pom.xml| 2 +- docker/pulsar-all/pom.xml | 2 +- docker/pulsar/pom.xml | 2 +- jclouds-shaded/pom.xml| 2 +- managed-ledger/pom.xml| 2 +- pom.xml | 4 ++-- pulsar-broker-auth-athenz/pom.xml | 2 +- pulsar-broker-auth-oidc/pom.xml | 2 +- pulsar-broker-auth-sasl/pom.xml | 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker/pom.xml | 2 +- pulsar-client-1x-base/pom.xml | 2 +- pulsar-client-1x-base/pulsar-client-1x/pom.xml| 2 +- pulsar-client-1x-base/pulsar-client-2x-shaded/pom.xml | 2 +- pulsar-client-admin-api/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml| 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-all/pom.xml | 2 +- pulsar-client-api/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml | 2 +- pulsar-client-auth-sasl/pom.xml | 2 +- pulsar-client-messagecrypto-bc/pom.xml| 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools-api/pom.xml | 2 +- pulsar-client-tools-customcommand-example/pom.xml | 2 +- pulsar-client-tools-test/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml | 2 +- pulsar-common/pom.xml | 2 +- pulsar-config-validation/pom.xml | 2 +- pulsar-functions/api-java/pom.xml | 2 +- pulsar-functions/instance/pom.xml | 2 +- pulsar-functions/java-examples-builtin/pom.xml| 2 +- pulsar-functions/java-examples/pom.xml| 2 +- pulsar-functions/localrun-shaded/pom.xml | 2 +- pulsar-functions/localrun/pom.xml | 2 +- pulsar-functions/pom.xml | 2 +- pulsar-functions/proto/pom.xml| 2 +- pulsar-functions/runtime-all/pom.xml | 2 +- pulsar-functions/runtime/pom.xml | 2 +- pulsar-functions/secrets/pom.xml | 2 +- pulsar-functions/utils/pom.xml| 2 +- pulsar-functions/worker/pom.xml | 2 +- pulsar-io/aerospike/pom.xml | 2 +- pulsar-io/alluxio/pom.xml | 2 +- pulsar-io/aws/pom.xml | 2 +- pulsar-io/batch-data-generator/pom.xml| 2 +- pulsar-io/batch-discovery-triggerers/pom.xml | 2 +- pulsar-io/canal/pom.xml | 2 +- pulsar-io/cassandra/pom.xml | 2 +- pulsar-io/common/pom.xml | 2 +- pulsar-io/core/pom.xml| 2 +- pulsar-io/data-generator/pom.xml | 2 +- pulsar-io/debezium/core/pom.xml | 2 +- pulsar-io/debezium/mongodb/pom.xml| 2 +- pulsar-io/debezium/mssql/pom.xml | 2 +- pulsar-io/debezium/mysql/pom.xml | 2 +- pulsar-io/debezium/oracle/pom.xml | 2 +- pulsar-io/debezium/pom.xml| 2 +- pulsar-io/debezium/postgres/pom.xml | 2 +- pulsar-io/docs/pom.xml| 2 +- pulsar-io/dynamodb/pom.xml| 2 +- pulsar-io/elastic-search/pom.xml | 2 +- pulsar-io/file/pom.xml| 2 +-
Re: [PR] [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request [pulsar]
Technoboy- closed pull request #22377: [fix][broker] Fix issue of field 'topic' is not set when handle GetSchema request URL: https://github.com/apache/pulsar/pull/22377 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug] Multi-role authorization does not work properly on granting topic level permissions based on ordering of roles in token [pulsar]
Technoboy- commented on issue #22343: URL: https://github.com/apache/pulsar/issues/22343#issuecomment-2024950672 @dhsy6z Could you help confirm if the below test match your reproduced steps? I can't reproduce it. you can put this method in org.apache.pulsar.client.api.MultiRolesTokenAuthorizationProviderTest. and then run it with green bar ``` @Test public void testMultiRole() throws Exception { String tenant = "tenant1"; @Cleanup PulsarAdmin admin = newPulsarAdmin(superUserToken); admin.tenants().createTenant(tenant, TenantInfo.builder() .adminRoles(Sets.newHashSet("Group_Test-user")) .allowedClusters(Sets.newHashSet(configClusterName)).build()); String namespace = tenant + "/namespace1"; admin.namespaces().createNamespace(namespace); String topic = namespace + "/" + "test-topic"; admin.topics().createNonPartitionedTopicAsync(topic); // Map claims = new HashMap<>(); Set roles = new HashSet<>(); roles.add("Group_Test-admin"); roles.add("Group_Test-user"); claims.put("roles", roles); final String token = Jwts.builder() .setClaims(claims) .signWith(secretKey) .compact(); @Cleanup PulsarAdmin adminTest = newPulsarAdmin(token); adminTest.namespaces().getTopics(namespace); admin.topics().grantPermission(topic, "Group_Test-user", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [fix][bug] Fix issue of Field 'topic' is not set when handle GetSchema request [pulsar]
coderzc opened a new pull request, #22377: URL: https://github.com/apache/pulsar/pull/22377 ### Motivation When the consumer uses AUTO_CONSUME to subscribe to PatternTopic, we get the `Field 'topic' is not set` error. The root cause is that we incorrectly used the `commandGetSchema` variable, which is a shared variable in the thread, but we're passing it on to another thread, this is unsafe since next request will reset this command data, this case only occurs when the topic does not exist or the scheme does not exist when request `GetSchema`. This issue Introduced from https://github.com/apache/pulsar/pull/20932 ```java java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: {"errorMsg":"java.lang.IllegalStateException: Field 'topic' is not set","reqId":3644652631296588088, "remote":"localhost/127.0.0.1:62361", "local":"/127.0.0.1:62369"} at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368) ~[?:?] at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) ~[?:?] at org.apache.pulsar.client.impl.ClientCnx.handleGetSchemaResponse(ClientCnx.java:970) ~[classes/:?] at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:361) ~[classes/:?] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.105.Final.jar:4.1.105.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.105.Final.jar:4.1.105.Final]
Re: [I] [Doc] Add a proper way to submit website issues and improvement proposals [pulsar]
asafm commented on issue #22277: URL: https://github.com/apache/pulsar/issues/22277#issuecomment-2024937889 @lhotari You are correct. I just wanted the process for website issues to be aligned with the process in general in Pulsar for triaging issues of any sort. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve][misc] Upgrade log4j2 to 2.23.1 [pulsar]
asafm commented on PR #22327: URL: https://github.com/apache/pulsar/pull/22327#issuecomment-2024934741 Ok guys, there was a lot of information thrown around here, and for me most of it was new. I'll try to summarize it all here: 1. Log4j2 supports plugins. allowing you to extend Log4j. In our example, it allows anyone to add a new type of Appender. 2. Prometheus Simple client has a dependency which we included in Pulsar: ``` io.prometheus simpleclient_log4j2 ``` The dependency includes a Log4j2 plugin, which adds a new `Prometheus` type Appender. As @lhotari noted it is: ``` @Plugin(name = "Prometheus", category = "Core", elementType = "appender") public final class InstrumentedAppender extends AbstractAppender { ``` This Appender receives the logs (log events) and exposes metrics about them - how many logs per type, etc. In effect, it instruments Log4j2. 3. The `Prometheus` type appender is used as you noted in the log4j2.yaml under Appenders ``` Prometheus: name: Prometheus ``` and then referenced to "forward" the logs to that appender so they can be counted. ``` # Default root logger configuration Root: level: "${sys:pulsar.log.root.level}" additivity: true AppenderRef: - ref: "${sys:pulsar.log.appender}" level: "${sys:pulsar.log.level}" - ref: Prometheus level: info ``` 4. Log4j previously allowed to configure which packages to scan for existence of the classes you need to extend if you wish to add a plugin. As you @nodece noted in the link you provided, you don't really need it. You just need the plugin to have the `@Plugin` annotation, which it does. This mean we can safely remove the `packages` line from log4j2.yaml. You do need to verify the Prometheus log4j metrics are still exposed as double check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][broker] Fix PersistentSubscription duplicate implementation interface Subscription [pulsar]
sherlock-lin commented on PR #22359: URL: https://github.com/apache/pulsar/pull/22359#issuecomment-2024915435 > > This is my first pr, if the merge in can give me more confidence, I'll try to avoid mentioning low value pr taking up resources until the technical debt clears up. > > @sherlock-lin Congrats for the first PR and welcome to join the Pulsar contributors! For the purpose of getting familiar to how PRs are merged and how the Pulsar CI works, this PR was a great way to learn that. > > You might notice in future PRs that we have quite a few flaky tests in Pulsar CI and usually there will be multiple flakes and the build will have to be retried multiple times. For this PR, you hadn't yet tried out the ["Personal CI"](https://pulsar.apache.org/contribute/personal-ci/) option for running PR builds in your own fork to get build feedback. That's useful to learn since you can retry failing builds as you like in your own fork and prepare the PR for final review. The guide is missing some details since you will have to enable GitHub Actions in your Pulsar fork. Contributions to the instructions are also welcome if you notice any gaps. > > I'm sorry about the criticism about this PR. I got stuck on the detail that this PR isn't a bug fix, but a code cleanup PR and ended up explaining more details why I'm not eager to encourage to do more code cleanup PRs before we have changed the Pulsar maintenance strategy ([#22359 (comment)](https://github.com/apache/pulsar/pull/22359#issuecomment-2022311328)). > > We have plenty of valuable contribution opportunies in the Apache Pulsar project. Please join [the Apache Pulsar Slack](https://pulsar.apache.org/community/#section-discussions) and it's #dev channel. When you are looking for contribution opportunies, I'll be happy to suggest valuable opportunies for you and help with any barriers for contributions. > > /cc @Technoboy- @codelipenghui Thanks @lhotari for your patient guidance! in this PR submission I have irregularities thanks for pointing out. in this process I have learned a lot about contributing code knowledge, I will learn more about submitting pr specification knowledge to improve efficiency! Also thanks to @Technoboy- @codecov-commenter @liangyuanpeng and other partners for their guidance and help. I'm honored to be able to join Pulsar contributors with everybody to make Pulsar better! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.2 updated: [improve][test][branch-3.2] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 6255b1282e8 [improve][test][branch-3.2] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage 6255b1282e8 is described below commit 6255b1282e8488ca3fc5fddcf320480af9b20255 Author: Lari Hotari AuthorDate: Thu Mar 28 12:36:44 2024 +0200 [improve][test][branch-3.2] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage - improve the test case based on suggestion from @poorbarcode --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 6782867eb50..0d9d6c0e573 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2641,10 +2641,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { Awaitility.await().untilAsserted(() -> { assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); }); -managedLedger.rollCurrentLedgerIfFull(); +managedLedger.createLedgerAfterClosed(); Awaitility.await().untilAsserted(() -> { -assertEquals(managedLedger.getLedgersInfo().size(), 2); -assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); +assertEquals(managedLedger.getLedgersInfo().size(), 3); +assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened); }); assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
(pulsar) branch branch-3.0 updated: [improve][test][branch-3.0] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 8f174463550 [improve][test][branch-3.0] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage 8f174463550 is described below commit 8f17446355095d9d24d781446bd9d60ca4a275ef Author: Lari Hotari AuthorDate: Thu Mar 28 12:36:44 2024 +0200 [improve][test][branch-3.0] Improve ManagedLedgerTest.testGetNumberOfEntriesInStorage - improve the test case based on suggestion from @poorbarcode --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d20bbf0d7f5..6989d5edf6c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2640,10 +2640,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { Awaitility.await().untilAsserted(() -> { assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); }); -managedLedger.rollCurrentLedgerIfFull(); +managedLedger.createLedgerAfterClosed(); Awaitility.await().untilAsserted(() -> { -assertEquals(managedLedger.getLedgersInfo().size(), 2); -assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); +assertEquals(managedLedger.getLedgersInfo().size(), 3); +assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened); }); assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
[PR] [fix][io] use create for bulk op if target cluster is ElasticSearch [pulsar]
465 opened a new pull request, #21377: URL: https://github.com/apache/pulsar/pull/21377 ### Motivation (recreated PR, original: https://github.com/apache/pulsar/pull/20870) on ElasticSearch, the bulk API must be called with `create` operation if the target index is data stream. the current plugin cannot insert documents to elasticsearch data stream if the bulk is enabled, throwing `illegal_argument_exception` this PR address this issue. ### Modifications - adds `INDEX_TYPE` config - use `CREATE` op if `INDEX_TYPE` is `DATA_STREAM` ### Verifying this change - [ ] Make sure that the change passes the CI checks. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [x] `doc-required` - [ ] `doc-not-needed` - [ ] `doc-complete` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][io] use create for bulk op if target cluster is ElasticSearch [pulsar]
shibd closed pull request #21377: [fix][io] use create for bulk op if target cluster is ElasticSearch URL: https://github.com/apache/pulsar/pull/21377 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) 02/02: [improve][misc] Pin Netty version in pulsar-io/alluxio (#21728)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit d3f0a4b3280142fe990280eff0f2c1c00379ba38 Author: Lari Hotari AuthorDate: Fri Dec 15 07:36:24 2023 +0200 [improve][misc] Pin Netty version in pulsar-io/alluxio (#21728) (cherry picked from commit b777136e57151c322416468f0be65841edc8be8a) --- pulsar-io/alluxio/pom.xml | 49 --- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/pulsar-io/alluxio/pom.xml b/pulsar-io/alluxio/pom.xml index 76fbcc7a5af..6a73cca523c 100644 --- a/pulsar-io/alluxio/pom.xml +++ b/pulsar-io/alluxio/pom.xml @@ -32,6 +32,7 @@ 2.9.3 4.1.11 1.37.0 +4.1.100.Final pulsar-io-alluxio @@ -56,12 +57,6 @@ org.alluxio alluxio-core-client-fs ${alluxio.version} - - -grpc-netty -io.grpc - - @@ -74,10 +69,6 @@ org.glassfish javax.el - -grpc-netty -io.grpc - @@ -90,22 +81,32 @@ com.google.guava guava - - - -io.grpc -grpc-netty -${grpc.version} - - - -io.dropwizard.metrics -metrics-jvm -${metrics.version} - - + + + +io.netty +netty-bom +${netty.version} +pom +import + + +io.grpc +grpc-bom +${grpc.version} +pom +import + + +io.dropwizard.metrics +metrics-jvm +${metrics.version} + + + +
(pulsar) 01/02: [fix][build] Upgrade alluxio version to 2.9.3 to fix CVE-2023-38889 (#21715)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 31f0ae44b7f34eceb4f06b42601f8eaba3f0c16e Author: Jiwei Guo AuthorDate: Thu Dec 14 15:24:33 2023 +0800 [fix][build] Upgrade alluxio version to 2.9.3 to fix CVE-2023-38889 (#21715) (cherry picked from commit 33313c02089e97ee6989bcae4da73cb6434b8fd5) --- pulsar-io/alluxio/pom.xml | 2 +- .../main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java | 7 --- .../java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java| 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-io/alluxio/pom.xml b/pulsar-io/alluxio/pom.xml index d08e38b5e0f..76fbcc7a5af 100644 --- a/pulsar-io/alluxio/pom.xml +++ b/pulsar-io/alluxio/pom.xml @@ -29,7 +29,7 @@ -2.7.3 +2.9.3 4.1.11 1.37.0 diff --git a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java index 413f05e0e17..3b72dc9666b 100644 --- a/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java +++ b/pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java @@ -22,12 +22,13 @@ import alluxio.AlluxioURI; import alluxio.client.WriteType; import alluxio.client.file.FileOutStream; import alluxio.client.file.FileSystem; +import alluxio.conf.Configuration; import alluxio.conf.InstancedConfiguration; import alluxio.conf.PropertyKey; import alluxio.exception.AlluxioException; import alluxio.grpc.CreateFilePOptions; import alluxio.grpc.WritePType; -import alluxio.util.FileSystemOptions; +import alluxio.util.FileSystemOptionsUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -78,7 +79,7 @@ public class AlluxioSink implements Sink { private AlluxioSinkConfig alluxioSinkConfig; private AlluxioState alluxioState; -private InstancedConfiguration configuration = InstancedConfiguration.defaults(); +private InstancedConfiguration configuration = Configuration.modifiableGlobal(); private ObjectMapper objectMapper = new ObjectMapper(); @@ -205,7 +206,7 @@ public class AlluxioSink implements Sink { private void createTmpFile() throws AlluxioException, IOException { CreateFilePOptions.Builder optionsBuilder = - FileSystemOptions.createFileDefaults(configuration).toBuilder(); + FileSystemOptionsUtils.createFileDefaults(configuration).toBuilder(); UUID id = UUID.randomUUID(); String fileExtension = alluxioSinkConfig.getFileExtension(); tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" + fileExtension; diff --git a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java index 9325a2255ab..bf40581aae1 100644 --- a/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java +++ b/pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java @@ -22,8 +22,8 @@ import alluxio.AlluxioURI; import alluxio.client.WriteType; import alluxio.client.file.FileSystem; import alluxio.client.file.URIStatus; +import alluxio.conf.Configuration; import alluxio.conf.PropertyKey; -import alluxio.conf.ServerConfiguration; import alluxio.master.LocalAlluxioCluster; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FilenameUtils; @@ -237,8 +237,8 @@ public class AlluxioSinkTest { private LocalAlluxioCluster setupSingleMasterCluster() throws Exception { // Setup and start the local alluxio cluster LocalAlluxioCluster cluster = new LocalAlluxioCluster(); -cluster.initConfiguration(getTestName(getClass().getSimpleName(), LocalAlluxioCluster.DEFAULT_TEST_NAME)); -ServerConfiguration.set(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.MUST_CACHE); +cluster.initConfiguration(getTestName(getClass().getSimpleName(), "test")); +Configuration.set(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.MUST_CACHE); cluster.start(); return cluster; }
(pulsar) branch branch-3.0 updated (c74eec7f087 -> d3f0a4b3280)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git from c74eec7f087 [fix][test] Fix flaky test BrokerServiceAutoSubscriptionCreationTest (#22190) new 31f0ae44b7f [fix][build] Upgrade alluxio version to 2.9.3 to fix CVE-2023-38889 (#21715) new d3f0a4b3280 [improve][misc] Pin Netty version in pulsar-io/alluxio (#21728) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: pulsar-io/alluxio/pom.xml | 51 +++--- .../apache/pulsar/io/alluxio/sink/AlluxioSink.java | 7 +-- .../pulsar/io/alluxio/sink/AlluxioSinkTest.java| 6 +-- 3 files changed, 33 insertions(+), 31 deletions(-)
(pulsar) branch branch-3.0 updated: [fix][test] Fix flaky test BrokerServiceAutoSubscriptionCreationTest (#22190)
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new c74eec7f087 [fix][test] Fix flaky test BrokerServiceAutoSubscriptionCreationTest (#22190) c74eec7f087 is described below commit c74eec7f087b90797a0e4ddfc527d92c68363519 Author: Jiwei Guo AuthorDate: Mon Mar 4 21:42:22 2024 +0800 [fix][test] Fix flaky test BrokerServiceAutoSubscriptionCreationTest (#22190) (cherry picked from commit 8c7c9788119197ac78bb90dfb030b1883139c026) --- .../service/BrokerServiceAutoSubscriptionCreationTest.java | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java index 3f2e182874e..f1128e389ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoSubscriptionCreationTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -159,15 +160,19 @@ public class BrokerServiceAutoSubscriptionCreationTest extends BrokerTestBase { throws PulsarAdminException, PulsarClientException { pulsar.getConfiguration().setAllowAutoTopicCreation(false); pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true); - admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation", "false"); +String allowAutoSubscriptionCreation = "allowAutoSubscriptionCreation"; + admin.brokers().updateDynamicConfiguration(allowAutoSubscriptionCreation, "false"); String topicString = "persistent://prop/ns-abc/non-partitioned-topic" + UUID.randomUUID(); String subscriptionName = "non-partitioned-topic-sub"; admin.topics().createNonPartitionedTopic(topicString); Assert.assertThrows(PulsarClientException.class, ()-> pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe()); - admin.brokers().updateDynamicConfiguration("allowAutoSubscriptionCreation", "true"); - pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe(); - assertTrue(admin.topics().getSubscriptions(topicString).contains(subscriptionName)); + admin.brokers().updateDynamicConfiguration(allowAutoSubscriptionCreation, "true"); +Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get(allowAutoSubscriptionCreation), "true"); + pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe(); + assertTrue(admin.topics().getSubscriptions(topicString).contains(subscriptionName)); +}); } }
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
codelipenghui commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1542603523 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception { +PersistentTopic persistentTopic = +(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); +ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); +ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor(cursorName); +managedLedger.getCursors().removeCursor(cursor.getName()); +managedLedger.getActiveCursors().removeCursor(cursor.getName()); +ManagedCursorImpl spyCursor = Mockito.spy(cursor); +managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST); +managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST); +AtomicInteger replyReadCounter = new AtomicInteger(); +Mockito.doAnswer(invocation -> { +if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { +replyReadCounter.incrementAndGet(); +} +return invocation.callRealMethod(); +}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any()); +Mockito.doAnswer(invocation -> { +if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { +replyReadCounter.incrementAndGet(); +} +return invocation.callRealMethod(); +}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); +admin.topics().createSubscription(topicName, cursorName, MessageId.earliest); +return replyReadCounter; +} + +@Test +public void testNoRepeatedReadAndDiscard() throws Exception { +int delayedMessages = 100; +final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); +final String subName = "my-sub"; +admin.topics().createNonPartitionedTopic(topic); +AtomicInteger replyReadCounter = injectReplayReadCounter(topic, subName); + +// Send messages. +@Cleanup +Producer producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create(); +for (int i = 0; i < delayedMessages; i++) { +MessageId messageId = producer.newMessage() +.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) +.value(100 + i) +.send(); +log.info("Published delayed message :{}", messageId); +} +producer.close(); + +// Make ack holes. +Consumer consumer1 = pulsarClient.newConsumer(Schema.INT32) +.topic(topic) +.subscriptionName(subName) +.receiverQueueSize(10) +.subscriptionType(SubscriptionType.Key_Shared) +.subscribe(); +Consumer consumer2 = pulsarClient.newConsumer(Schema.INT32) +.topic(topic) +.subscriptionName(subName) +.receiverQueueSize(10) +.subscriptionType(SubscriptionType.Key_Shared) +.subscribe(); +List msgList1 = new ArrayList<>(); +List msgList2 = new ArrayList<>(); +for (int i = 0; i < 10; i++) { +Message msg1 = consumer1.receive(1, TimeUnit.SECONDS); +if (msg1 != null) { +msgList1.add(msg1); +} +Message msg2 = consumer2.receive(1, TimeUnit.SECONDS); +if (msg2 != null) { +msgList2.add(msg2); +} +} +Consumer redeliverConsumer = null; +if (!msgList1.isEmpty()) { +msgList1.forEach(msg -> consumer1.acknowledgeAsync(msg)); +redeliverConsumer = consumer2; +} else { +msgList2.forEach(msg -> consumer2.acknowledgeAsync(msg)); +redeliverConsumer = consumer1; +} Review Comment: Actually, it can't ensure the ack hole is introduced. The second consumer might not able to get any messages until the first consumer acked all the messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Threads config for Pulsar Consumers [pulsar]
GitHub user danielnesaraj added a comment to the discussion: Threads config for Pulsar Consumers Thank you for the comprehensive response @lhotari ! This helps. GitHub link: https://github.com/apache/pulsar/discussions/22375#discussioncomment-8938836 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [improve] [broker] Avoid repeated Read-and-discard when using Key_Shared mode [pulsar]
codelipenghui commented on code in PR #22245: URL: https://github.com/apache/pulsar/pull/22245#discussion_r1542586864 ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception { Review Comment: Should it be `injectNormalReadCounter`? ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java: ## @@ -437,6 +436,82 @@ protected synchronized NavigableSet getMessagesToReplayNow(int max } } +private int getAvailablePermits(Consumer c) { +int availablePermits = Math.max(c.getAvailablePermits(), 0); +if (c.getMaxUnackedMessages() > 0) { +// Avoid negative number +int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); +availablePermits = Math.min(availablePermits, remainUnAckedMessages); +} +return availablePermits; +} + +@Override +protected NavigableSet filterOutEntriesWillBeDiscarded(NavigableSet src) { Review Comment: It's better to reuse the `getRestrictedMaxEntriesForConsumer` method to avoid the inconsistent behaviors between filter out before read entries and filter out after read entries. ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private AtomicInteger injectReplayReadCounter(String topicName, String cursorName) throws Exception { +PersistentTopic persistentTopic = +(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); +ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); +ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.openCursor(cursorName); +managedLedger.getCursors().removeCursor(cursor.getName()); +managedLedger.getActiveCursors().removeCursor(cursor.getName()); +ManagedCursorImpl spyCursor = Mockito.spy(cursor); +managedLedger.getCursors().add(spyCursor, PositionImpl.EARLIEST); +managedLedger.getActiveCursors().add(spyCursor, PositionImpl.EARLIEST); +AtomicInteger replyReadCounter = new AtomicInteger(); +Mockito.doAnswer(invocation -> { +if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { +replyReadCounter.incrementAndGet(); +} +return invocation.callRealMethod(); +}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any()); +Mockito.doAnswer(invocation -> { +if (!String.valueOf(invocation.getArguments()[2]).equals("Normal")) { +replyReadCounter.incrementAndGet(); +} +return invocation.callRealMethod(); +}).when(spyCursor).asyncReplayEntries(Mockito.anySet(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); +admin.topics().createSubscription(topicName, cursorName, MessageId.earliest); +return replyReadCounter; +} + +@Test +public void testNoRepeatedReadAndDiscard() throws Exception { +int delayedMessages = 100; +final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); +final String subName = "my-sub"; +admin.topics().createNonPartitionedTopic(topic); +AtomicInteger replyReadCounter = injectReplayReadCounter(topic, subName); + +// Send messages. +@Cleanup +Producer producer = pulsarClient.newProducer(Schema.INT32).topic(topic).enableBatching(false).create(); +for (int i = 0; i < delayedMessages; i++) { +MessageId messageId = producer.newMessage() +.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) +.value(100 + i) +.send(); +log.info("Published delayed message :{}", messageId); Review Comment: Why named delayed messages. It actually published normal messages. ## pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java: ## @@ -1630,4 +1637,263 @@ public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception { log.info("Got {} other messages...", sum); Assert.assertEquals(sum, delayedMessages + messages); } + +private
(pulsar) branch branch-3.2 updated: [fix][test][branch-3.2] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 9927b55b177 [fix][test][branch-3.2] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage 9927b55b177 is described below commit 9927b55b1778aaa53bcfa77aef74af6dc6d11610 Author: Lari Hotari AuthorDate: Thu Mar 28 12:14:23 2024 +0200 [fix][test][branch-3.2] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage - change https://github.com/apache/pulsar/pull/22034 is missing from branch-3.2 --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 264a2907969..6782867eb50 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2639,12 +2639,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { // trigger ledger rollover and wait for the new ledger created Awaitility.await().untilAsserted(() -> { - assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString()); + assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); }); managedLedger.rollCurrentLedgerIfFull(); Awaitility.await().untilAsserted(() -> { -assertEquals(managedLedger.getLedgersInfo().size(), 3); -assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened); +assertEquals(managedLedger.getLedgersInfo().size(), 2); +assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); }); assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
[I] golang adds a batch fetch message API [pulsar-client-go]
wangrenyi opened a new issue, #1201: URL: https://github.com/apache/pulsar-client-go/issues/1201 Consumer consumer = pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription") .batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(10).maxNumBytes(1024 * 1024) .timeout(200, TimeUnit.MILLISECONDS).build()) .subscribe(); while (true) { Messages messages = consumer.batchReceive(); for (Message msg : messages) { // TODO } consumer.acknowledge(messages); } When will golang provide a batch fetch message, batch submit message API with the same functionality as the java client -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
(pulsar) branch branch-3.0 updated: [fix][test][branch-3.0] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage
This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new e3531e808c1 [fix][test][branch-3.0] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage e3531e808c1 is described below commit e3531e808c172ff588e36499c41746835d06904a Author: Lari Hotari AuthorDate: Thu Mar 28 12:14:23 2024 +0200 [fix][test][branch-3.0] Fix broken ManagedLedgerTest.testGetNumberOfEntriesInStorage - change https://github.com/apache/pulsar/pull/22034 is missing from branch-3.0 --- .../java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 10bfb699780..d20bbf0d7f5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2638,12 +2638,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { // trigger ledger rollover and wait for the new ledger created Awaitility.await().untilAsserted(() -> { - assertEquals("LedgerOpened", WhiteboxImpl.getInternalState(managedLedger, "state").toString()); + assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); }); managedLedger.rollCurrentLedgerIfFull(); Awaitility.await().untilAsserted(() -> { -assertEquals(managedLedger.getLedgersInfo().size(), 3); -assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened); +assertEquals(managedLedger.getLedgersInfo().size(), 2); +assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger); }); assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
Re: [I] Enable Pulsar authentication for Client authentication using private_key_jwt method [pulsar]
lhotari commented on issue #22371: URL: https://github.com/apache/pulsar/issues/22371#issuecomment-2024829126 > Hello @lhotari, > > sorry, maybe I was not clear enough. What I was talking about was the Java client we are using. We could use the option with the JWT you have mentioned, but that would mean that every time the JWT expires we would have to update it in our systems and production as well. That also normally means a restart of the application that uses this JWT token. > > We use a OAuth2 authentication mechanism, but we do not have the `client_id` and `client_secret` that is being used in OAuth2 Java client implementation. In our case we use the _private_key_jwt_ method that instead of the `client_id` and `client_secret`, uses `client_assertion_type` which is `urn:ietf:params:oauth:client-assertion-type:jwt-bearer` and `client_assertion` that contains information for client authentication. It must be digitally signed using a private key. Thanks @WZHMIJJ, this is a great clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [D] Threads config for Pulsar Consumers [pulsar]
GitHub user lhotari added a comment to the discussion: Threads config for Pulsar Consumers > I am looking into the thread config of the pulsar client. I found this: > https://stackoverflow.com/questions/56954771/apache-pulsar-iothreads-listenerthreads-and-message-ordering Yes, the answer continues to apply to Pulsar client. > Then there is only 1 thread in which all 3 consumers are going to consume > messages from their respective topics, right? Assuming I have the requisite CPU cores allocated, would I then see performance improvement in doing the following: Yes, this makes sense. It's worth validating your assumption by testing. Thanks for sharing a good question where you already provided a helpful response too. Regarding performance, adding threads isn't the only consideration. This diagram from Wikipedia's HTTP pipelining article illustrates the issue with one-by-one handling of messages: ![image](https://github.com/apache/pulsar/assets/66864/dccc7063-82e6-4758-bb0c-1d5b58939b72) For many use cases, it's possible to significantly reduce the number of consumers and increase throughput by using **pipelining**. In Pulsar, this requires using the async API. Pulsar Functions support pipelining too when the return type is a CompletableFuture, but the feature isn't well documented. For key-ordered processing with pipelining, the Pulsar Reactive Client provides a solution based on Project Reactor's groupBy operator. This is one of the sweet spots of the Reactive Client, but there's not much documentation about it. I made [a conference presentation about the initial ideas in SpringOne 2021](https://springone.io/2021/sessions/reactive-applications-with-apache-pulsar-and-spring-boot). The presentation is slightly outdated since Spring Pulsar has come out after that. However, the code examples have been updated to use Spring Pulsar and Pulsar Reactive client since then. The code examples for pipelining having been updated. For pipelining, Pulsar Reactive client has built-in support in the ReactiveMessagePipeline. ([implementation](https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java#L125-L128), [single testcase which isn't a good usage example](https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java#L321-L329)), however, the documentation isn't great for this feature that unlocks key-ordered processing with a configurable concurrency/parallelism level. The javadoc contains some limited docs: https://github.com/apache/pulsar-client-reactive/blob/b8b42df975c4bb2cd275936402d6439087695554/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java#L122-L142 Since Project Reactor has great support for retries, it's very simple to implement reliable integration pipelines leveraging these features. I hope I could have time to do an updated conference talk with updates to docs and examples to explain all of this. GitHub link: https://github.com/apache/pulsar/discussions/22375#discussioncomment-8938633 This is an automatically sent email for commits@pulsar.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@pulsar.apache.org
Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]
liangyepianzhou commented on code in PR #21274: URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542581449 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java: ## @@ -100,6 +104,79 @@ protected void cleanup() { super.internalCleanup(); } +/** + * Test consumer can be built successfully with retryable exception + * and get correct error with no-retryable exception. + * @throws Exception + */ +@Test(timeOut = 6) +public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception { +// 1. Prepare and make sure the consumer can be built successfully. +String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck"; +@Cleanup +Consumer consumer1 = pulsarClient.newConsumer() +.subscriptionName("subName1") +.topic(topic) +.subscribe(); +// 2. Mock a transactionPendingAckStoreProvider to test building consumer +// failing at transactionPendingAckStoreProvider::checkInitializedBefore. +Field transactionPendingAckStoreProviderField = PulsarService.class +.getDeclaredField("transactionPendingAckStoreProvider"); +transactionPendingAckStoreProviderField.setAccessible(true); +TransactionPendingAckStoreProvider pendingAckStoreProvider = +(TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField +.get(pulsarServiceList.get(0)); +TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass()); +// 3. Test retryable exception when checkInitializedBefore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +// First, the method checkInitializedBefore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize"))) +// Then, the method will be executed successfully. +.thenReturn(CompletableFuture.completedFuture(false)); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer2 = pulsarClient.newConsumer() +.subscriptionName("subName2") +.topic(topic) +.subscribe(); + +// 4. Test retryable exception when newPendingAckStore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +.thenReturn(CompletableFuture.completedFuture(true)); + +when(mockProvider.newPendingAckStore(any())) +// First, the method newPendingAckStore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store"))) +// Then, the method will be executed successfully. +.thenCallRealMethod(); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer3 = pulsarClient.newConsumer() +.subscriptionName("subName3") +.topic(topic) +.subscribe(); + +// 5. Test no-retryable exception: +// The consumer building will be failed without retrying. Review Comment: Good suggestion! Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]
BewareMyPower commented on code in PR #21274: URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542552357 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java: ## @@ -100,6 +104,79 @@ protected void cleanup() { super.internalCleanup(); } +/** + * Test consumer can be built successfully with retryable exception + * and get correct error with no-retryable exception. + * @throws Exception + */ +@Test(timeOut = 6) +public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception { +// 1. Prepare and make sure the consumer can be built successfully. +String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck"; +@Cleanup +Consumer consumer1 = pulsarClient.newConsumer() +.subscriptionName("subName1") +.topic(topic) +.subscribe(); +// 2. Mock a transactionPendingAckStoreProvider to test building consumer +// failing at transactionPendingAckStoreProvider::checkInitializedBefore. +Field transactionPendingAckStoreProviderField = PulsarService.class +.getDeclaredField("transactionPendingAckStoreProvider"); +transactionPendingAckStoreProviderField.setAccessible(true); +TransactionPendingAckStoreProvider pendingAckStoreProvider = +(TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField +.get(pulsarServiceList.get(0)); +TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass()); +// 3. Test retryable exception when checkInitializedBefore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +// First, the method checkInitializedBefore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize"))) +// Then, the method will be executed successfully. +.thenReturn(CompletableFuture.completedFuture(false)); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer2 = pulsarClient.newConsumer() +.subscriptionName("subName2") +.topic(topic) +.subscribe(); + +// 4. Test retryable exception when newPendingAckStore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +.thenReturn(CompletableFuture.completedFuture(true)); + +when(mockProvider.newPendingAckStore(any())) +// First, the method newPendingAckStore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store"))) +// Then, the method will be executed successfully. +.thenCallRealMethod(); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer3 = pulsarClient.newConsumer() +.subscriptionName("subName3") +.topic(topic) +.subscribe(); + +// 5. Test no-retryable exception: +// The consumer building will be failed without retrying. Review Comment: You can open another PR to add such new error (`TransactionComponentLoadFailedException`) as your comment in `exceptionHandleFuture` says. It's okay here for now. Your comment "retrying" here should only refer the server-side retry. But since this test could take too long (48 seconds in my local env) to wait for the client side, it's better to reduce the timeout by creating `consumer4` with another `PulsarClient` that configures the operation timeout. ```java @Cleanup PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) .operationTimeout(3, TimeUnit.SECONDS) .build(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]
liangyepianzhou commented on code in PR #21274: URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542537254 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java: ## @@ -100,6 +104,79 @@ protected void cleanup() { super.internalCleanup(); } +/** + * Test consumer can be built successfully with retryable exception + * and get correct error with no-retryable exception. + * @throws Exception + */ +@Test(timeOut = 6) +public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception { +// 1. Prepare and make sure the consumer can be built successfully. +String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck"; +@Cleanup +Consumer consumer1 = pulsarClient.newConsumer() +.subscriptionName("subName1") +.topic(topic) +.subscribe(); +// 2. Mock a transactionPendingAckStoreProvider to test building consumer +// failing at transactionPendingAckStoreProvider::checkInitializedBefore. +Field transactionPendingAckStoreProviderField = PulsarService.class +.getDeclaredField("transactionPendingAckStoreProvider"); +transactionPendingAckStoreProviderField.setAccessible(true); +TransactionPendingAckStoreProvider pendingAckStoreProvider = +(TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField +.get(pulsarServiceList.get(0)); +TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass()); +// 3. Test retryable exception when checkInitializedBefore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +// First, the method checkInitializedBefore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize"))) +// Then, the method will be executed successfully. +.thenReturn(CompletableFuture.completedFuture(false)); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer2 = pulsarClient.newConsumer() +.subscriptionName("subName2") +.topic(topic) +.subscribe(); + +// 4. Test retryable exception when newPendingAckStore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +.thenReturn(CompletableFuture.completedFuture(true)); + +when(mockProvider.newPendingAckStore(any())) +// First, the method newPendingAckStore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store"))) +// Then, the method will be executed successfully. +.thenCallRealMethod(); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer3 = pulsarClient.newConsumer() +.subscriptionName("subName3") +.topic(topic) +.subscribe(); + +// 5. Test no-retryable exception: +// The consumer building will be failed without retrying. Review Comment: Another approach is to add new errors and exceptions to catch this case. What do you think about it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix][txn]Handle exceptions in the transaction pending ack init [pulsar]
liangyepianzhou commented on code in PR #21274: URL: https://github.com/apache/pulsar/pull/21274#discussion_r1542534570 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java: ## @@ -100,6 +104,79 @@ protected void cleanup() { super.internalCleanup(); } +/** + * Test consumer can be built successfully with retryable exception + * and get correct error with no-retryable exception. + * @throws Exception + */ +@Test(timeOut = 6) +public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception { +// 1. Prepare and make sure the consumer can be built successfully. +String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck"; +@Cleanup +Consumer consumer1 = pulsarClient.newConsumer() +.subscriptionName("subName1") +.topic(topic) +.subscribe(); +// 2. Mock a transactionPendingAckStoreProvider to test building consumer +// failing at transactionPendingAckStoreProvider::checkInitializedBefore. +Field transactionPendingAckStoreProviderField = PulsarService.class +.getDeclaredField("transactionPendingAckStoreProvider"); +transactionPendingAckStoreProviderField.setAccessible(true); +TransactionPendingAckStoreProvider pendingAckStoreProvider = +(TransactionPendingAckStoreProvider) transactionPendingAckStoreProviderField +.get(pulsarServiceList.get(0)); +TransactionPendingAckStoreProvider mockProvider = mock(pendingAckStoreProvider.getClass()); +// 3. Test retryable exception when checkInitializedBefore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +// First, the method checkInitializedBefore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail initialize"))) +// Then, the method will be executed successfully. +.thenReturn(CompletableFuture.completedFuture(false)); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer2 = pulsarClient.newConsumer() +.subscriptionName("subName2") +.topic(topic) +.subscribe(); + +// 4. Test retryable exception when newPendingAckStore: +// The consumer will be built successfully after one time retry. +when(mockProvider.checkInitializedBefore(any())) +.thenReturn(CompletableFuture.completedFuture(true)); + +when(mockProvider.newPendingAckStore(any())) +// First, the method newPendingAckStore will fail with a retryable exception. +.thenReturn(FutureUtil.failedFuture(new ManagedLedgerException("mock fail new store"))) +// Then, the method will be executed successfully. +.thenCallRealMethod(); +transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), mockProvider); +@Cleanup +Consumer consumer3 = pulsarClient.newConsumer() +.subscriptionName("subName3") +.topic(topic) +.subscribe(); + +// 5. Test no-retryable exception: +// The consumer building will be failed without retrying. Review Comment: >The consumer still retries because the error code from the broker side is UnknownError. Yes, it is correct. According to the discussion, manual intervention is necessary when an unexpected/unretryable exception happens. So `UnknownError` may be acceptable. ## pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java: ## @@ -100,6 +104,79 @@ protected void cleanup() { super.internalCleanup(); } +/** + * Test consumer can be built successfully with retryable exception + * and get correct error with no-retryable exception. + * @throws Exception + */ +@Test(timeOut = 6) +public void testBuildConsumerEncounterPendingAckInitFailure() throws Exception { +// 1. Prepare and make sure the consumer can be built successfully. +String topic = NAMESPACE1 + "/testUnloadSubscriptionWhenFailedInitPendingAck"; +@Cleanup +Consumer consumer1 = pulsarClient.newConsumer() +.subscriptionName("subName1") +.topic(topic) +.subscribe(); +// 2. Mock a transactionPendingAckStoreProvider to test building consumer +// failing at transactionPendingAckStoreProvider::checkInitializedBefore. +Field transactionPendingAckStoreProviderField = PulsarService.class +