(pulsar-client-go) branch master updated: improve: use chan *message instead of chan []*message as queueCh (#1283)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 06f26935 improve: use chan *message instead of chan []*message as queueCh (#1283) 06f26935 is described below commit 06f2693583cca6855c5731f5268d32d1758240d3 Author: Zixuan Liu AuthorDate: Tue Oct 8 17:43:06 2024 +0800 improve: use chan *message instead of chan []*message as queueCh (#1283) ### Motivation We currently use `chan []*message` as queueCh, and use the slice to stage the messages to send the message to the parent consumer, this will result in excessive use of memory. This PR optimizes potentially reducing overall memory overhead. ### Modifications - Use `chan *message` instead of `chan []*message`. - Fix test. --- pulsar/consumer_partition.go | 101 +- pulsar/consumer_partition_test.go | 41 +--- 2 files changed, 67 insertions(+), 75 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 823e0e87..831f763a 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -154,7 +154,7 @@ type partitionConsumer struct { // the size of the queue channel for buffering messages maxQueueSizeint32 - queueCh chan []*message + queueCh chan *message startMessageID atomicMessageID lastDequeuedMsg *trackingMessageID @@ -338,7 +338,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon partitionIdx: int32(options.partitionIdx), eventsCh: make(chan interface{}, 10), maxQueueSize: int32(options.receiverQueueSize), - queueCh: make(chan []*message, options.receiverQueueSize), + queueCh: make(chan *message, options.receiverQueueSize), startMessageID: atomicMessageID{msgID: options.startMessageID}, connectedCh: make(chan struct{}), messageCh:messageCh, @@ -1057,37 +1057,33 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header return fmt.Errorf("discarding message on decryption error :%v", err) case crypto.ConsumerCryptoFailureActionConsume: pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err) - messages := []*message{ - { - publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), - eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()), - key: msgMeta.GetPartitionKey(), - producerName: msgMeta.GetProducerName(), - properties: internal.ConvertToStringMap(msgMeta.GetProperties()), - topic:pc.topic, - msgID: newMessageID( - int64(pbMsgID.GetLedgerId()), - int64(pbMsgID.GetEntryId()), - pbMsgID.GetBatchIndex(), - pc.partitionIdx, - pbMsgID.GetBatchSize(), - ), - payLoad: headersAndPayload.ReadableSlice(), - schema: pc.options.schema, - replicationClusters: msgMeta.GetReplicateTo(), - replicatedFrom: msgMeta.GetReplicatedFrom(), - redeliveryCount: response.GetRedeliveryCount(), - encryptionContext: createEncryptionContext(msgMeta), - orderingKey: string(msgMeta.OrderingKey), - }, - } - if pc.options.autoReceiverQueueSize { pc.incomingMessages.Inc() pc.markScaleIfNeed() } - pc.queueCh <- messages + pc.queueCh <- &message{ + publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), + eventTime: timeFromUnixTimest
(pulsar-client-go) branch master updated (3d2c1cba -> 8f3334e6)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 3d2c1cba [fix] fix flaky test `TestCloseFlushWithTimer` (#1292) add 8f3334e6 Add changelog for 0.14.0 (#1288) No new revisions were added by this update. Summary of changes: CHANGELOG.md | 39 +++ 1 file changed, 39 insertions(+)
(pulsar-client-go) annotated tag v0.14.0 updated (630d5f82 -> 543ce4d3)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.14.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.14.0 was modified! *** from 630d5f82 (commit) to 543ce4d3 (tag) tagging 630d5f8218e00fe248d745c63ad0905527f45e66 (commit) replaces v0.13.0-candidate-1 by Zike Yang on Mon Sep 30 09:50:56 2024 +0800 - Log - Release v0.14.0 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmb6BAAQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VXHAD/908PAdRgEkJXoJlvgNqS5oghUrtVGpBvTS URJzj862CxzlhfDvIS6/3W2LJAe0YBBmPB60ezAvMf+EndVOdFlqdhPh+42jQjhN S1Hu3H6ywNaQUJjEEVfRGlXBsZWGA4O/wtdpUTKadJo9uuxiCyYg+Hq57JdRSd+C 9lrNGoV/AutnhFtE1nKW3cMcWO7YOLZRcnyoDYKr5MH6Jh0gOLczn1m8Vzm1pkDn WUtwFGC+M+u/wK62zMM7ojox3kF+5TFdYqS0qnz1GjxhSS0dlNLeMY+zTJ6UhIYB VwAeqnUDCo8P3xpg1KXz3LvrXBHs2+IyzVyeouXs5FZoFgTKZRLMbb6XzcolbK95 wtHvXuGONceu8pTes/o1vO7zOWd2mY+iHxslZLqMGlrjZnq0P1pfBGzhVlrAkVyO oBjJejYRdLAnJzWtT+s78HoMyYpXptlkc/Zz574NP4OJo/jawdCJkYRFYktrvy// AY7vvuZDQCWMGfZaBp4UdMhD+26LVJh07zKD5+Z46Mzy9A7g+UuFjeIJFDBWmaO6 GTR2+tHcIC/lpsRNijyaF2/xudl/6XslVn4bZYd+Qx8Dpa79xAyIv/35eBkURtCn llIIsD7eoIckhDXTM0Qd5GpRG0v/3wV0FOLbnVrEU+uErJjStARrET6GwxLNOhPV koPp59WEqg== =9TV4 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch branch-0.14.0 updated: Add changelog for 0.14.0 (#1288)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.14.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.14.0 by this push: new 5ba9b828 Add changelog for 0.14.0 (#1288) 5ba9b828 is described below commit 5ba9b82820a384fc54819c2e894e3eb72637eebe Author: Zike Yang AuthorDate: Mon Sep 30 09:41:52 2024 +0800 Add changelog for 0.14.0 (#1288) (cherry picked from commit 8f3334e62e2713659aa2b1cdbe41d6e7517cde8a) --- CHANGELOG.md | 39 +++ 1 file changed, 39 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a07fa0c0..b09abd33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,45 @@ All notable changes to this project will be documented in this file. +[0.14.0] 2024-09-23 + +## Important notice + +- The client has dropped support for Go 1.20. The minimum supported version is now Go + 1.21. ([Related PR](https://github.com/apache/pulsar-client-go/pull/1249)) +- The `BackoffPolicy` interface and related methods has been moved to the `backoff` package from the `internal` package. + The `BackoffPolicy` interface is renamed to + `Policy`. ([Related PR](https://github.com/apache/pulsar-client-go/pull/1197)) + +## What's Changed +* [improve] Install golang by image in the Dockerfile by @nodece in https://github.com/apache/pulsar-client-go/pull/1246 +* [ci] Validate multiple version builds and use golangci-lint-action by @nodece in https://github.com/apache/pulsar-client-go/pull/1250 +* [ci] Add merge limitation by @nodece in https://github.com/apache/pulsar-client-go/pull/1248 +* [fix] Fix the key-based batch can't guarantee the ordering when flushing by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1252 +* [fix] FailTimeoutMessages cannot delete outdated messages by @nodece in https://github.com/apache/pulsar-client-go/pull/1247 +* [fix] Fix pulsar admin revoke subscription permission endpoint by @NoFacePeace in https://github.com/apache/pulsar-client-go/pull/1251 +* [fix] Producer close was blocked by @nodece in https://github.com/apache/pulsar-client-go/pull/1249 +* [fix] PulsarCtl 1266 Oauth2 Client credentials flow use scopes from the keyfile as well by @Nikolajls in https://github.com/apache/pulsar-client-go/pull/1244 +* [feat] Add support for subscription expiration time namespace settings by @klevy-toast in https://github.com/apache/pulsar-client-go/pull/1254 +* [fix] Prevent panic when calling Flush on closed producer by @Gilthoniel in https://github.com/apache/pulsar-client-go/pull/1260 +* [fix] Avoid a data race when flushing with load by @Gilthoniel in https://github.com/apache/pulsar-client-go/pull/1261 +* [improve] Add InitialSubscriptionName for DLQPolicy by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1264 +* [fix] Peek message will return -1 for partitionIndex by @shibd in https://github.com/apache/pulsar-client-go/pull/1267 +* [chore] Bump github.com/docker/docker from 27.0.3+incompatible to 27.1.1+incompatible by @dependabot in https://github.com/apache/pulsar-client-go/pull/1269 +* [feat] Support the namespace offloadThresholdInSeconds API in pulsaradmin pkg by @ericsyh in https://github.com/apache/pulsar-client-go/pull/1271 +* [fix] Stop timer when close timedAckGroupingTracker by @geniusjoe in https://github.com/apache/pulsar-client-go/pull/1279 +* [improve] Refactor connection concurrency model by @Gilthoniel in https://github.com/apache/pulsar-client-go/pull/1275 +* [fix] Attempt to avoid deadlock during reconnection by @Gilthoniel in https://github.com/apache/pulsar-client-go/pull/1273 +* [fix] Fixed panic caused by memory not aligned in arm32 arch by @dream-kzx in https://github.com/apache/pulsar-client-go/pull/1286 +* [fix] Reconnection logic and Backoff policy doesn't work correctly by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1197 + +## New Contributors +* @NoFacePeace made their first contribution in https://github.com/apache/pulsar-client-go/pull/1251 +* @Nikolajls made their first contribution in https://github.com/apache/pulsar-client-go/pull/1244 +* @klevy-toast made their first contribution in https://github.com/apache/pulsar-client-go/pull/1254 +* @ericsyh made their first contribution in https://github.com/apache/pulsar-client-go/pull/1271 +* @dream-kzx made their first contribution in https://github.com/apache/pulsar-client-go/pull/1286 + [0.13.1] 2024-08-02 - [fix] Avoid a data race when flushing with load by Gaylor Bosson in [#1261](https://github.com/apache/pulsar-client-go/pull/1261)
svn commit: r71967 - /dev/pulsar/pulsar-client-go-0.14.0-candidate-1/ /release/pulsar/pulsar-client-go-0.14.0/
Author: zike Date: Mon Sep 30 01:52:53 2024 New Revision: 71967 Log: release go client 0.14.0 Added: release/pulsar/pulsar-client-go-0.14.0/ - copied from r71966, dev/pulsar/pulsar-client-go-0.14.0-candidate-1/ Removed: dev/pulsar/pulsar-client-go-0.14.0-candidate-1/
svn commit: r71809 - in /dev/pulsar/pulsar-client-go-0.14.0-candidate-1: ./ apache-pulsar-client-go-0.14.0-src.tar.gz apache-pulsar-client-go-0.14.0-src.tar.gz.asc apache-pulsar-client-go-0.14.0-src.t
Author: zike Date: Mon Sep 23 10:40:05 2024 New Revision: 71809 Log: Staging artifacts and signature for Pulsar Client Go release 0.14.0-candidate-1 Added: dev/pulsar/pulsar-client-go-0.14.0-candidate-1/ dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.asc Mon Sep 23 10:40:05 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmbxRUIACgkQT0AbyNP5 ++1XSsQ//Rdk61KxoQzY8FzxxwwlO3jK6NQ1vmSvO/RSPhvgGnblmamy9+OXwZHGl +M2KrDTDev/XLddoCHnFqCfRQINejEcdA7wymuiSb35VbG1gUeJP7L62B79yKTftb +olfoMVCMBpdzx/Bm0yG8xreHt/HRkcSE5Fdr1gCh4pwyoZ6VZ0iZE9kxINsvTsvs +0FZsLju8h3/sJxcbTDvbvgg2WZob0P7gb7lK+Mx4Gkz1Hw5UbDC8wMltxHNs4UkD +hpEw+m3Bj0U5abu02CvcMmolFNkC/qoL/UP0ciVs9i/vK/qo76gayQ/aWPvOC5AX +AjbdZkA03yGHBrCbmB2oL0+qrCnIYFFaeBwSqO78b7kMcWgp/62R4ij/0x8W1rq0 +8T/U/NvX0Bi8erv7J6tLmOsZbm8UhkFtEiISED0oNCWnxhy6B43o0950VWXytWzh +eDJ/uPMEK9QdJ9TOu1JPWuiHRx7600gmdAFAb8KJ3KfoNw/h+xWvbmY6Q1kDLs+E +jdEMqny3+hrIev3e5/6POoL0fPp/DG7DQOrZ6isNzuGaoxsLCMIKcJs5P7jOIm0t +7+/OXfaRfFI+2j7YARvX8Q9QEDeZrPUO33UPRrW5JPXBwWWwMhkHJQMCS9J+cxpX +GT4IBNuwRAK5SruqcpsVuaeg1SkWgdgh20V2xdvZcfwAvQIJTfQ= +=ql88 +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.14.0-candidate-1/apache-pulsar-client-go-0.14.0-src.tar.gz.sha512 Mon Sep 23 10:40:05 2024 @@ -0,0 +1 @@ +5965174fe9bf2e40075d22f3ac8f6a4c9239cc68df768cc070ed963f9af26cc989f8660ff9a5ce024025a028755e5571fff69b9f967373256532682f40fb1929 apache-pulsar-client-go-0.14.0-src.tar.gz
(pulsar-client-go) annotated tag v0.14.0-candidate-1 updated (630d5f82 -> 6d6f3225)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.14.0-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.14.0-candidate-1 was modified! *** from 630d5f82 (commit) to 6d6f3225 (tag) tagging 630d5f8218e00fe248d745c63ad0905527f45e66 (commit) replaces v0.13.0-candidate-1 by Zike Yang on Mon Sep 23 18:38:01 2024 +0800 - Log - Release v0.14.0-candidate-1 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmbxRQkQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7Vew1D/4krNLvrbY/As0isaRouOR9CnV4x2o6OB5m BjiRFvshRCakzKUt7nDfCLM2vh3ZyjSAS4r0iylKPPhViwo/5XgjptjZZSQ6GCzJ UBRNM7sY+zL1lBdijCPbbyPENz7rEw1Tgv0wDeeAIjbIFYiFzfT2+DSvQNuNCSkT 1PRMMrlUADKZbwQVeWOurFFYpXozkQvsZGIMgp4oxLm9ZYW2UKLkqrxMgYt3/Y3y lo+iQQDoaFmnMxRbS/G3ZS9sMQ5mXnoaKhfPH+uz7S2B4GQ3hPdXJjDHUJJ6spmm h9aLyNGTHO8JnJXbgKKDxz8Db8CO0gdI4ZuWwoV8KwiSiETE/nK0dd5HOJEaWLq4 0VIVKGZHuej2twdwEBUGh0lhwhKGFFoFjzsTGOC2KGsnLuh3PwIoutX/7xqMIY0p 1xN5I29WUZrzfqEXZSsLRV3WSOEeO7E2K5NHHtgX9KDwYJ5gJ33YEd34S5RFEY4u JIkAHDIASyoM5XDQAbthe1oW3H//yxmjnkCU/kfSHnLwqyLBZztu12/3tH10TMMF C7mox41c7kl8iis+z1pn3d5dH4ZhQMpohNmG5v6v4cUhSvFWtemYc5ExQD3ghush rb0LEdPrXiCzC+8PJ0g2TwqY9kZqElccKmWgOPKrzulzXa9DZ5MFY8Zs1YaWj0SB JCEMWsa4Mg== =QxD3 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch branch-0.14.0 created (now 630d5f82)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch branch-0.14.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git at 630d5f82 [fix] Reconnection logic and Backoff policy doesn't work correctly (#1197) No new revisions were added by this update.
(pulsar-client-go) branch master updated: [fix] Reconnection logic and Backoff policy doesn't work correctly (#1197)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 630d5f82 [fix] Reconnection logic and Backoff policy doesn't work correctly (#1197) 630d5f82 is described below commit 630d5f8218e00fe248d745c63ad0905527f45e66 Author: crossoverJie AuthorDate: Mon Sep 23 18:16:35 2024 +0800 [fix] Reconnection logic and Backoff policy doesn't work correctly (#1197) Fixes #1187 ### Modifications - Move `backoff.go` to the `backoff` directory (because there are circular dependencies, they are not moved to the pulsar directory.) - Create a new method for `BackOffPolicy` interface `IsMaxBackoffReached(delayReconnectTime, totalDelayReconnectTime time.Duration) bool` This is a **breaking change** that modifies the package name and interface name. Package: `internal`->`backoff` Interface name: `BackoffPolicy`-> `Policy` - Co-authored-by: Zixuan Liu Co-authored-by: Zike Yang --- pulsar/{internal => backoff}/backoff.go | 24 -- pulsar/{internal => backoff}/backoff_test.go | 4 ++- pulsar/blue_green_migration_test.go | 15 + pulsar/consumer.go | 6 ++-- pulsar/consumer_impl.go | 7 +++-- pulsar/consumer_partition.go | 41 +--- pulsar/consumer_regex_test.go| 10 +++--- pulsar/consumer_test.go | 16 ++ pulsar/dlq_router.go | 47 +--- pulsar/internal/http_client.go | 6 ++-- pulsar/internal/rpc_client.go| 6 ++-- pulsar/producer.go | 6 ++-- pulsar/producer_partition.go | 47 +--- pulsar/producer_test.go | 20 +++- pulsar/reader.go | 6 ++-- pulsar/reader_impl.go| 7 +++-- pulsar/reader_test.go| 23 ++ pulsar/retry_router.go | 36 + pulsar/transaction_coordinator_client.go | 4 ++- 19 files changed, 207 insertions(+), 124 deletions(-) diff --git a/pulsar/internal/backoff.go b/pulsar/backoff/backoff.go similarity index 77% rename from pulsar/internal/backoff.go rename to pulsar/backoff/backoff.go index 3284fb7e..453da578 100644 --- a/pulsar/internal/backoff.go +++ b/pulsar/backoff/backoff.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal +package backoff import ( "math/rand" @@ -26,10 +26,17 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// BackoffPolicy parameterize the following options in the reconnection logic to +// Policy parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) -type BackoffPolicy interface { +type Policy interface { + // Next returns the delay to wait before next retry Next() time.Duration + + // IsMaxBackoffReached evaluates if the max number of retries is reached + IsMaxBackoffReached() bool + + // Reset the backoff to the initial state + Reset() } // DefaultBackoff computes the delay before retrying an action. @@ -38,6 +45,13 @@ type DefaultBackoff struct { backoff time.Duration } +func NewDefaultBackoff() Policy { + return &DefaultBackoff{} +} +func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy { + return &DefaultBackoff{backoff: backoff / 2} +} + const maxBackoff = 60 * time.Second // Next returns the delay to wait before next retry @@ -61,3 +75,7 @@ func (b *DefaultBackoff) Next() time.Duration { func (b *DefaultBackoff) IsMaxBackoffReached() bool { return b.backoff >= maxBackoff } + +func (b *DefaultBackoff) Reset() { + b.backoff = 0 +} diff --git a/pulsar/internal/backoff_test.go b/pulsar/backoff/backoff_test.go similarity index 96% rename from pulsar/internal/backoff_test.go rename to pulsar/backoff/backoff_test.go index e05ea292..fc0a4923 100644 --- a/pulsar/internal/backoff_test.go +++ b/pulsar/backoff/backoff_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package internal +package backoff import ( "testing" @@ -58,4 +58,6 @@ func TestBackoff_NextMaxValue(t *testing.T) { assert.Equal(t, true, backoff.IsMaxBackoffReached()) // max value is 60 seconds + 20% jitter = 72 seconds assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second)) + backoff.Reset() +
(pulsar-client-go) branch master updated: [Issue 1272][connection] Attempt to avoid deadlock during reconnection (#1273)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 98dc8d42 [Issue 1272][connection] Attempt to avoid deadlock during reconnection (#1273) 98dc8d42 is described below commit 98dc8d42abb638a4371969214c93d96c5dc51cc5 Author: Gaylor Bosson AuthorDate: Wed Sep 11 11:54:24 2024 +0200 [Issue 1272][connection] Attempt to avoid deadlock during reconnection (#1273) Fixes #1272 ### Motivation Producers and consumers register themselves to the connection to be notified when it gets closed. Even though the callback push the events in a channel, it can get stuck and the connection pool is locked which prevents any other caller to get a connection. ### Modifications This PR makes sure that the callback never blocks. --- pulsar/consumer_partition.go | 10 +++--- pulsar/producer_partition.go | 10 +++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index d8001dc1..f307972c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -332,7 +332,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon startMessageID: atomicMessageID{msgID: options.startMessageID}, connectedCh: make(chan struct{}), messageCh:messageCh, - connectClosedCh: make(chan *connectionClosed, 10), + connectClosedCh: make(chan *connectionClosed, 1), closeCh: make(chan struct{}), clearQueueCh: make(chan func(id *trackingMessageID)), compressionProviders: sync.Map{}, @@ -1381,8 +1381,12 @@ func (pc *partitionConsumer) ConnectionClosed(closeConsumer *pb.CommandCloseCons assignedBrokerURL = pc.client.selectServiceURL( closeConsumer.GetAssignedBrokerServiceUrl(), closeConsumer.GetAssignedBrokerServiceUrlTls()) } - pc.connectClosedCh <- &connectionClosed{ - assignedBrokerURL: assignedBrokerURL, + + select { + case pc.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: + default: + // Reconnect has already been requested so we do not block the + // connection callback. } } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1677c570..97ab8b94 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -171,7 +171,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions producerID: client.rpcClient.NewProducerID(), dataChan: make(chan *sendRequest, maxPendingMessages), cmdChan: make(chan interface{}, 10), - connectClosedCh: make(chan *connectionClosed, 10), + connectClosedCh: make(chan *connectionClosed, 1), batchFlushTicker: time.NewTicker(batchingMaxPublishDelay), compressionProvider: internal.GetCompressionProvider(pb.CompressionType(options.CompressionType), compression.Level(options.CompressionLevel)), @@ -413,8 +413,12 @@ func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProdu assignedBrokerURL = p.client.selectServiceURL( closeProducer.GetAssignedBrokerServiceUrl(), closeProducer.GetAssignedBrokerServiceUrlTls()) } - p.connectClosedCh <- &connectionClosed{ - assignedBrokerURL: assignedBrokerURL, + + select { + case p.connectClosedCh <- &connectionClosed{assignedBrokerURL: assignedBrokerURL}: + default: + // Reconnect has already been requested so we do not block the + // connection callback. } }
(pulsar) branch master updated: [feat][broker] PIP-368: Support lookup based on the lookup properties (#23223)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9a97c843a46 [feat][broker] PIP-368: Support lookup based on the lookup properties (#23223) 9a97c843a46 is described below commit 9a97c843a46e23a0811e2172991cd00a3af642c0 Author: Zike Yang AuthorDate: Wed Aug 28 09:56:05 2024 +0800 [feat][broker] PIP-368: Support lookup based on the lookup properties (#23223) PIP: https://github.com/apache/pulsar/pull/23075 ### Motivation This is the implementation for the PIP: https://github.com/apache/pulsar/pull/23075 Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field in the client configuration. Clients can then share these properties with the broker during lookup. On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to return. Here is the rack-aware lookup scenario for using the client properties for the lookup: Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup, enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the owner broker since the broker and the client have the same rack property. ### Modifications - Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties to the broker through `CommandLookupTopic` request. - Add `properties` field to the `CommandLookupTopic`. - Add `lookupProperties` to the `LookupOptions`. The Load Manager implementation can access the `properties` through `LookupOptions` to make a better decision on which broker to return. - Introduce a new broker configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix` will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties during the lookup. Co-authored-by: Yunze Xu --- .../apache/pulsar/broker/ServiceConfiguration.java | 19 .../loadbalance/extensions/BrokerRegistryImpl.java | 3 +- .../extensions/data/BrokerLookupData.java | 3 +- .../pulsar/broker/lookup/TopicLookupBase.java | 8 +- .../pulsar/broker/namespace/LookupOptions.java | 2 + .../apache/pulsar/broker/service/ServerCnx.java| 13 ++- .../extensions/data/BrokerLookupDataTest.java | 4 +- .../extensions/filter/BrokerFilterTestBase.java| 3 +- .../filter/BrokerIsolationPoliciesFilterTest.java | 3 +- .../extensions/manager/RedirectManagerTest.java| 4 +- .../extensions/scheduler/TransferShedderTest.java | 3 +- .../pulsar/client/api/LookupPropertiesTest.java| 110 + .../common/naming/ServiceConfigurationTest.java| 14 +++ .../apache/pulsar/client/api/ClientBuilder.java| 12 +++ .../client/impl/BinaryProtoLookupService.java | 3 +- .../pulsar/client/impl/ClientBuilderImpl.java | 6 ++ .../client/impl/conf/ClientConfigurationData.java | 11 +++ .../client/impl/BinaryProtoLookupServiceTest.java | 2 + .../apache/pulsar/common/protocol/Commands.java| 8 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + 20 files changed, 220 insertions(+), 13 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index c836879b075..6488ace991e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -21,9 +21,11 @@ package org.apache.pulsar.broker; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@
(pulsar-client-go) branch dependabot/go_modules/github.com/docker/docker-27.1.1incompatible deleted (was 47037233)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch dependabot/go_modules/github.com/docker/docker-27.1.1incompatible in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git was 47037233 chore(deps): bump github.com/docker/docker The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(pulsar-client-go) branch master updated: chore(deps): bump github.com/docker/docker (#1269)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 01e32e74 chore(deps): bump github.com/docker/docker (#1269) 01e32e74 is described below commit 01e32e7458dd1b96f8718adfdcff33a70b3e073d Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> AuthorDate: Tue Aug 20 18:46:47 2024 +0800 chore(deps): bump github.com/docker/docker (#1269) Bumps [github.com/docker/docker](https://github.com/docker/docker) from 27.0.3+incompatible to 27.1.1+incompatible. - [Release notes](https://github.com/docker/docker/releases) - [Commits](https://github.com/docker/docker/compare/v27.0.3...v27.1.1) --- updated-dependencies: - dependency-name: github.com/docker/docker dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 64f500a6..2266621c 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/danieljoos/wincred v1.1.2 // indirect github.com/distribution/reference v0.6.0 // indirect - github.com/docker/docker v27.0.3+incompatible // indirect + github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect diff --git a/go.sum b/go.sum index 9b8d0e18..8a5cd935 100644 --- a/go.sum +++ b/go.sum @@ -100,8 +100,8 @@ github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4w github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v27.0.3+incompatible h1:aBGI9TeQ4MPlhquTQKq9XbK79rKFVwXNUAYz9aXyEBE= -github.com/docker/docker v27.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= +github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
(pulsar-site) branch main updated: [fix][doc] Fix code block format for transaction demo (#952)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new b17a5313ce99 [fix][doc] Fix code block format for transaction demo (#952) b17a5313ce99 is described below commit b17a5313ce99dfaede0adf26cba54b13c27ca48d Author: Zike Yang AuthorDate: Thu Aug 15 22:20:58 2024 +0800 [fix][doc] Fix code block format for transaction demo (#952) * Fix code block format for transaction demo * Push changes for old version --- docs/txn-use.md | 391 versioned_docs/version-3.0.x/txn-use.md | 391 versioned_docs/version-3.1.x/txn-use.md | 391 versioned_docs/version-3.2.x/txn-use.md | 391 versioned_docs/version-3.3.x/txn-use.md | 390 +++ 5 files changed, 980 insertions(+), 974 deletions(-) diff --git a/docs/txn-use.md b/docs/txn-use.md index 193a03de4dbb..556d868b6c4c 100644 --- a/docs/txn-use.md +++ b/docs/txn-use.md @@ -5,6 +5,11 @@ sidebar_label: "Get started" description: Get started to use Pulsar transaction API. --- +mdx-code-block +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + Pulsar transaction is primarily a server-side and protocol-level feature. This tutorial guides you through every step of how to use the [Pulsar transaction API](/api/admin/) to send and receive messages in a Java client. :::note @@ -78,8 +83,6 @@ To use Pulsar transaction API, complete the following steps. ::: -**Input** - mdx-code-block -```java -PulsarClient client = PulsarClient.builder() -// Step 3: create a Pulsar client and enable transactions. -.enableTransaction(true) -.serviceUrl(jct.serviceUrl) -.build(); - -// Step 4: create three producers to produce messages to input and output topics. -ProducerBuilder producerBuilder = client.newProducer(Schema.STRING); -Producer inputProducer = producerBuilder.topic(inputTopic) -.sendTimeout(0, TimeUnit.SECONDS).create(); -Producer outputProducerOne = producerBuilder.topic(outputTopicOne) -.sendTimeout(0, TimeUnit.SECONDS).create(); -Producer outputProducerTwo = producerBuilder.topic(outputTopicTwo) -.sendTimeout(0, TimeUnit.SECONDS).create(); -// Step 4: create three consumers to consume messages from input and output topics. -Consumer inputConsumer = client.newConsumer(Schema.STRING) - .subscriptionName("your-subscription-name").topic(inputTopic).subscribe(); -Consumer outputConsumerOne = client.newConsumer(Schema.STRING) - .subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe(); -Consumer outputConsumerTwo = client.newConsumer(Schema.STRING) - .subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe(); - -int count = 2; -// Step 5: produce messages to input topics. -for (int i = 0; i < count; i++) { -inputProducer.send("Hello Pulsar! count : " + i); -} - -// Step 5: consume messages and produce them to output topics with transactions. -for (int i = 0; i < count; i++) { - -// Step 5: the consumer successfully receives messages. -Message message = inputConsumer.receive(); - -// Step 6: create transactions. -// The transaction timeout is specified as 10 seconds. -// If the transaction is not committed within 10 seconds, the transaction is automatically aborted. -Transaction txn = null; -try { -txn = client.newTransaction() -.withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); -// Step 6: you can process the received message with your use case and business logic. - -// Step 7: the producers produce messages to output topics with transactions -outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne count : " + i).send(); -outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo count : " + i).send(); - -// Step 7: the consumers acknowledge the input message with the transactions *individually*. -inputConsumer.acknowledgeAsync(message.getMessageId(), txn).g
(pulsar-client-go) branch master updated: [fix] peek message will return -1 for partitionIndex (#1267)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new ab042ae7 [fix] peek message will return -1 for partitionIndex (#1267) ab042ae7 is described below commit ab042ae714d14ff8a07ba0d55ba4d887879e1e00 Author: Baodi Shi AuthorDate: Tue Aug 13 18:58:49 2024 +0800 [fix] peek message will return -1 for partitionIndex (#1267) ### Motivation If peek a partitioned topic, will see a message id: `7316:0:-1:-1`, the parititonIndex should not be -1. ``` pulsarctl subscription peek --count 10 persistent://public/default/my-topic-partition-0 test-sub Message ID : 7316:0:-1:-1 Properties : { "publish-time": "2024-08-08T17:50:39.476+08:00" } Message : 68 65 6c 6c 6f 2d 31 |hello-1| ``` ### Modifications - Set partition index on peek message. --- pulsaradmin/pkg/admin/subscription.go | 3 +- pulsaradmin/pkg/admin/subscription_test.go | 54 ++ pulsaradmin/pkg/utils/message_id.go| 9 + pulsaradmin/pkg/utils/topic_name.go| 4 +++ 4 files changed, 69 insertions(+), 1 deletion(-) diff --git a/pulsaradmin/pkg/admin/subscription.go b/pulsaradmin/pkg/admin/subscription.go index 8ddb5845..996ebb4e 100644 --- a/pulsaradmin/pkg/admin/subscription.go +++ b/pulsaradmin/pkg/admin/subscription.go @@ -234,8 +234,9 @@ const ( ) func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) { + msgID := resp.Header.Get("X-Pulsar-Message-ID") - ID, err := utils.ParseMessageID(msgID) + ID, err := utils.ParseMessageIDWithPartitionIndex(msgID, topic.GetPartitionIndex()) if err != nil { return nil, err } diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index c4ba717d..92c79c1f 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -90,6 +90,60 @@ func TestGetMessagesByID(t *testing.T) { } +func TestPeekMessageForPartitionedTopic(t *testing.T) { + ctx := context.Background() + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + topicName, _ := utils.GetTopicName(topic) + subName := "test-sub" + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + err = admin.Topics().Create(*topicName, 2) + assert.NoError(t, err) + + err = admin.Subscriptions().Create(*topicName, subName, utils.Earliest) + assert.NoError(t, err) + + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.NoError(t, err) + defer producer.Close() + + for i := 0; i < 100; i++ { + producer.SendAsync(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }, nil) + } + err = producer.Flush() + if err != nil { + return + } + + for i := 0; i < 2; i++ { + topicWithPartition := fmt.Sprintf("%s-partition-%d", topic, i) + topicName, err := utils.GetTopicName(topicWithPartition) + assert.NoError(t, err) + messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 10) + assert.NoError(t, err) + assert.NotNil(t, messages) + for _, msg := range messages { + assert.Equal(t, msg.GetMessageID().PartitionIndex, i) + } + } +} + func TestGetMessageByID(t *testing.T) { randomName := newTopicName() topic := "persistent://public/default/" + randomName diff --git a/pulsaradmin/pkg/utils/message_id.go b/pulsaradmin/pkg/utils/message_id.go index d75b613e..f65c031e 100644 --- a/pulsaradmin/pkg/utils/message_id.go +++ b/pulsaradmin/pkg/utils/message_id.go @@ -34,6 +34,15 @@ type MessageID struct { var Latest = MessageID{0x7fff, 0x7fff, -1, -1} var Earliest = MessageID{-1, -1, -1, -1} +func ParseMessageIDWithPartitionIndex(str string, index int) (*MessageID, error) { + id, err := ParseMessageID(str) + if err != nil { + return nil, err + } + id.PartitionIndex = index + return id, nil +} + func ParseMessageID(str string) (*MessageID, error
(pulsar-site) branch main updated: [improve][doc] add golang transaction demo in txn-use.md (#948)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new df8ff29a7d92 [improve][doc] add golang transaction demo in txn-use.md (#948) df8ff29a7d92 is described below commit df8ff29a7d92ec539fe626a41b0c9277bbbae1b7 Author: zhou zhuohan <843520...@qq.com> AuthorDate: Mon Aug 12 18:49:51 2024 +0800 [improve][doc] add golang transaction demo in txn-use.md (#948) Co-authored-by: ninjazhou --- docs/txn-use.md | 131 +++- versioned_docs/version-3.0.x/txn-use.md | 131 +++- versioned_docs/version-3.1.x/txn-use.md | 131 +++- versioned_docs/version-3.2.x/txn-use.md | 131 +++- versioned_docs/version-3.3.x/txn-use.md | 131 +++- 5 files changed, 650 insertions(+), 5 deletions(-) diff --git a/docs/txn-use.md b/docs/txn-use.md index 6d91283422a5..193a03de4dbb 100644 --- a/docs/txn-use.md +++ b/docs/txn-use.md @@ -54,7 +54,7 @@ To use Pulsar transaction API, complete the following steps. Transaction coordinator metadata setup success ``` -3. Create a Pulsar client and enable transactions. +3. Create a Pulsar client and enable transactions. Since client need to know transaction coordinator from system topic, please make sure your client role has system namespace `pulsar/system` produce/consume permissions. 4. Create producers and consumers. @@ -80,6 +80,13 @@ To use Pulsar transaction API, complete the following steps. **Input** +mdx-code-block + + + + ```java PulsarClient client = PulsarClient.builder() // Step 3: create a Pulsar client and enable transactions. @@ -162,6 +169,128 @@ To use Pulsar transaction API, complete the following steps. } ``` + + + + ```go + // Step 3: create a Pulsar client and enable transactions. + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: "", + EnableTransaction: true, + }) + if err != nil { + log.Fatalf("create client fail, err = %v", err) + } + defer client.Close() + // Step 4: create three producers to produce messages to input and output topics. + inputTopic := "inputTopic" + outputTopicOne := "outputTopicOne" + outputTopicTwo := "outputTopicTwo" + subscriptionName := "your-subscription-name" + inputProducer, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: inputTopic, + SendTimeout: 0, + }) + defer inputProducer.Close() + outputProducerOne, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicOne, + SendTimeout: 0, + }) + defer outputProducerOne.Close() + outputProducerTwo, _ := client.CreateProducer(pulsar.ProducerOptions{ + Topic: outputTopicTwo, + SendTimeout: 0, + }) + defer outputProducerTwo.Close() + + // Step 4: create three consumers to consume messages from input and output topics. + inputConsumer, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic:inputTopic, + SubscriptionName: subscriptionName, + }) + defer inputConsumer.Close() + outputConsumerOne, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic:outputTopicOne, + SubscriptionName: subscriptionName, + }) + defer outputConsumerOne.Close() + outputConsumerTwo, _ := client.Subscribe(pulsar.ConsumerOptions{ + Topic:outputTopicTwo, + SubscriptionName: subscriptionName, + }) + defer outputConsumerTwo.Close() + + // Step 5: produce messages to input topics. + ctx := context.Background() + count := 2 + for i := 0; i < count; i++ { + inputProducer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte(fmt.Sprintf("Hello Pulsar! count : %d", i)), + }) + } + // Step 5: consume messages and produce them to output topics with transactions. + for i := 0; i < count; i++ { + // Step 5: the consumer successfully receives messages. + message, err := inputConsumer.Receive(ctx) + if err != nil { + log.Printf("receive message from %s fail, err = %v", inputTopic, err) + continue + } + // Step 6: create transactions. + // The transaction timeout is specified as 10 seconds. +
svn commit: r70768 - /dev/pulsar/pulsar-client-go-0.13.1-candidate-1/ /release/pulsar/pulsar-client-go-0.13.1/
Author: zike Date: Fri Aug 9 10:11:20 2024 New Revision: 70768 Log: release 0.13.1 Added: release/pulsar/pulsar-client-go-0.13.1/ - copied from r70767, dev/pulsar/pulsar-client-go-0.13.1-candidate-1/ Removed: dev/pulsar/pulsar-client-go-0.13.1-candidate-1/
(pulsar-client-go) branch master updated: Add changelog for 0.13.1 (#1266)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 6a2e4617 Add changelog for 0.13.1 (#1266) 6a2e4617 is described below commit 6a2e461754bec9d463c678685645b4d8c82839b6 Author: Zike Yang AuthorDate: Fri Aug 9 18:09:38 2024 +0800 Add changelog for 0.13.1 (#1266) --- CHANGELOG.md | 11 +++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ec9129b9..a07fa0c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,17 @@ All notable changes to this project will be documented in this file. +[0.13.1] 2024-08-02 + +- [fix] Avoid a data race when flushing with load by Gaylor Bosson in [#1261](https://github.com/apache/pulsar-client-go/pull/1261) +- [fix] Prevent panic when calling Flush on closed producer by Gaylor Bosson in [#1260](https://github.com/apache/pulsar-client-go/pull/1260) +- [fix] Producer close was blocked by Zixuan Liu in [#1249](https://github.com/apache/pulsar-client-go/pull/1249) +- [ci] Validate multiple version builds and use golangci-lint-action by Zixuan Liu in [#1250](https://github.com/apache/pulsar-client-go/pull/1250) +- [fix] Fix pulsar admin revoke subscription permission endpoint by 码斯克 in [#1251](https://github.com/apache/pulsar-client-go/pull/1251) +- [fix] failTimeoutMessages cannot delete outdated messages by Zixuan Liu in [#1247](https://github.com/apache/pulsar-client-go/pull/1247) +- [fix] Oauth2 Client credentials flow use scopes from the keyfile as well by Nikolaj Lund Sørensen in [#1244](https://github.com/apache/pulsar-client-go/pull/1244) +- [fix] Fix the key-based batch can't guarantee the ordering when flushing by Zike Yang in [#1252](https://github.com/apache/pulsar-client-go/pull/1252) + [0.13.0] 2024-07-12 ## Important notice
(pulsar-client-go) annotated tag v0.13.1 updated (e44bd048 -> a9690709)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.13.1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.13.1 was modified! *** from e44bd048 (commit) to a9690709 (tag) tagging e44bd0481cc8b3b407c257a3411a97d48bb29844 (commit) replaces v0.13.0 by Zike Yang on Fri Aug 9 18:05:40 2024 +0800 - Log - Release v0.13.1 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAma16fUQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VW9+D/9tHT5EWjUA6/hJSYq33RaCplG42djtUxKE I/+i23/X8HaDcqICuCKiT61PSLM6XptBUInMal99LKQxkKQQTZ+LJBcuULOy648z ZYyuLvuwKSKI26lXMDuzjSeVp02jB3rX1URRUOSgHPG2vmcokun7hvO+8XkPjSwj PaCSdRdLJHihgShHzV7DqYERlmijqkxGIi4pVgkHEVs/b/mCDnER0prL/rJ2DuAQ G4/iLa/sAReNyIN0V97G19mz9ftXC6nFgktTC5FYgaaNbsoQ7Ky/E5e9tdeo2wxh cXsU7QQazHYXqAqsxynxMfDKDu/rV0v9ELBe4aPaFljyBs3w15JX72a8lzmHli9G 8Rr+D2Ju0ljR0ddtTmguXbumrQIG7+5m+Yt2Hy+bmjybrsQHAw8bfHnTZEx1X3gD YtEedO4+V7cPQ1ZoFSF8owXRZ0RyjflcxsZ91wZMMmuh+JPcaj+xT0jRjboNRIeR 7xppXrUnyRrBTAzFGKap0fH+Q27fuNhJNaZQP2DBLEyqCIl4KE79c7jH8P7gyeqY LgWz1yl/hapBqytKviy1X2D9t/ycavVa5zYrwClHi7SIID/1w4tyz14864x7Ti4q AT7/Iz6fsKkWe0FZ9E5mu6ZF24cVomppxSA9EwvJJuRxWgzWtuqYy4isw+8KGW4h RZdQ1brJ2A== =mN/d -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar) branch master updated: [improve][pip] PIP-368: Support lookup based on the lookup properties (#23075)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b7440e9023f [improve][pip] PIP-368: Support lookup based on the lookup properties (#23075) b7440e9023f is described below commit b7440e9023fcd497266a848372f61838eff345f5 Author: Zike Yang AuthorDate: Wed Aug 7 10:20:25 2024 +0800 [improve][pip] PIP-368: Support lookup based on the lookup properties (#23075) ### Motivation Currently, the lookup process uses only the topic name as its parameter. However, to enhance this process, it's beneficial for clients to provide additional information. This could be done by introducing the `lookupProperties` field in the client configuration. Clients can then share these properties with the broker during lookup. On the broker side, the broker could also contain some properties that are used for the lookup. We can also support the lookupProperties for the broker. The broker can use these properties to make a better decision on which broker to return. Here is the rack-aware lookup scenario for using the client properties for the lookup: Assuming there are two brokers that broker-0 configures the lookup property "rack" with "A" and broker-1 configures the lookup property "rack" with "B". By using the lookup properties, clients can supply rack information during the lookup, enabling the broker to identify and connect them to the nearest broker within the same rack. If a client that configures the "rack" property with "A" connects to a lookup broker, the customized load manager can determine broker-0 as the owner broker since the broker and the client have the same rack property. ### Modifications Add new configuration `lookupProperties` to the client. While looking up the broker, the client will send the properties to the broker through `CommandLookupTopic` request. The `lookupProperties` will then be added to the `LookupOptions`. The Load Manager implementation can access the `properties` through `LookupOptions` to make a better decision on which broker to return. The properties are used only when the protocol is the binary protocol, starting with `pulsar://` or `pulsar+ssl://`, or if the `loadManagerClassName` in the broker is a class that implements the `ExtensibleLoadManager` interface. To support configuring the `lookupProperties` on the broker side, introduce a new broker configuration `lookupPropertyPrefix`. Any broker configuration properties that start with the `lookupPropertyPrefix` will be included into the `BrokerLookupData` and be persisted in the metadata store. The broker can use these properties during the lookup. In this way, to support the rack-aware lookup scenario mentioned in the "Motivation" part, the client can set the rack information in the client `lookupProperties`. Similarly, the broker can also set the rack information in the broker configuration like `lookup.rack`. The `lookup.rack` will be stored in the `BrokerLookupData`. A customized load manager can then be implemented. For each lookup request, it will go through the `BrokerLookupData` for all brokers and select the broker in the same rack to return. --- pip/pip-368.md | 185 + 1 file changed, 185 insertions(+) diff --git a/pip/pip-368.md b/pip/pip-368.md new file mode 100644 index 000..06bba2c1276 --- /dev/null +++ b/pip/pip-368.md @@ -0,0 +1,185 @@ +# PIP-368: Support lookup based on the lookup properties + +# Background knowledge + +## How Pulsar Lookup Works + +Before producing or consuming messages, a Pulsar client must first find the broker responsible for the topic. This +happens through the lookup service. The client sends a `CommandLookupTopic` request with the topic name to the broker +lookup service. + +On the broker side, the broker will register itself to the metadata store using a distributed lock with the value +of [`BrokerLookupData`](https://github.com/apache/pulsar/blob/7fe92ac43cfd2f2de5576a023498aac8b46c7ac8/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java#L34-L44) +when starting. The lookup service will first choose the owner broker. And then retrieve the `BrokerLookupData` of the +owner broker and finally return to the client. The client then interacts with this broker to produce or consume +messages. + +Users can customize the lookup process by setting a custom load manager in the `loadManagerClassName` configuration. + +# Motivation + +Currently, the lookup process uses only the topic name as its parameter. However, to enhance t
svn commit: r70660 - in /dev/pulsar/pulsar-client-go-0.13.1-candidate-1: ./ apache-pulsar-client-go-0.13.1-src.tar.gz apache-pulsar-client-go-0.13.1-src.tar.gz.asc apache-pulsar-client-go-0.13.1-src.t
Author: zike Date: Fri Aug 2 09:41:50 2024 New Revision: 70660 Log: Staging artifacts and signature for Pulsar Client Go release 0.13.1-candidate-1 Added: dev/pulsar/pulsar-client-go-0.13.1-candidate-1/ dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.asc Fri Aug 2 09:41:50 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmasqZAACgkQT0AbyNP5 ++1VZxg/9GZRjFdr9OaX3y0rVHGqW+HIcHmZoyhmVeSvoWQq1tvQnO/eDB3gbjs3b +h4VpjZV2PtuFTKEtKBpxasHyFnudbfVH8OkFaHEwvFBUwzI0tkRD902VCtq7mlkl +vyx/UirBFAcN3TuMzayjbazaG6GKKe6pHi/dAAvOgzaBgSpo+Lzyv1uHDHxqlAHk +cA8y9Ge6tEDGoSUfLdrp069gQMJaRhlb7YoDqyL+7dU+mE55qc3aP7KR9dOIHdEI +o+MSbGjaw/aoFVkTUJBZNXlx+9MmfjoYhmU2kHCUpwLb3/dldAzkbbJ6sfMM3VQT +vqGhxEn24ydLfljobE1wJ4m9IXv8mp8YILycqB11ulp31pl12jOgspEXX1cLynaW +vSvlQkr3oS+RIZdSMz79CoTJoHunZYb4nIvGNF9dkqvzwgwVJ67WklV7zOctrFgj +LwMddLWtwe1MIsi2/VpTGI3ztca58pip7TCJokOD/QlZPvIzAEHEQzUnP8sJVRgz +IyAvhgAyZMpuJj58QaO8tkGGrJ7sFoKaGrstIdi7fj0svY7lRH1YFMLa5BuEIvZf +1mq6eVMy177Hyhjn4QGu104PFRRkKc750ZuYFJaVaPMrnPQIy6RwbMqWov7v9M8d +SDiVptY+H18nrdSCrlwdgOq/gJPqu+aoeeML7OzTTjo0kwdgQjk= +=uHbd +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.13.1-candidate-1/apache-pulsar-client-go-0.13.1-src.tar.gz.sha512 Fri Aug 2 09:41:50 2024 @@ -0,0 +1 @@ +92efe7474f7af8164b9974996595d0fb92af465ca1e7da7bad6510b0c081d3db5327995512b4d6f50f0bd7848bf85a2755349656ad5ae43c382c2de2dd7a6fde apache-pulsar-client-go-0.13.1-src.tar.gz
(pulsar-client-go) annotated tag v0.13.1-candidate-1 updated (e44bd048 -> 979706d4)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.13.1-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.13.1-candidate-1 was modified! *** from e44bd048 (commit) to 979706d4 (tag) tagging e44bd0481cc8b3b407c257a3411a97d48bb29844 (commit) replaces v0.13.0 by Zike Yang on Fri Aug 2 17:39:05 2024 +0800 - Log - Release v0.13.0-candidate-1 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmasqTkQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7Vdd5D/9T6Y4Ub8mmAQpRflWT1z347AjzBduullbN Aj0ugqi8ByddAHGaLxu+NP7cPs2/qUmOXONmYTUxVhE3r3T3iGs6T/4TboaOrljT vSTeZP5l4Pfgq+TEyXLL3j2JgUyzBOlQFWmKcHX8jbZStNKE3sWWPq28DzGoSpm4 HWIpOVPHn6VZ4NgG3VY5pmyY2rU3odCyLWUk/r+rJ8qfLBgX7IKkb81Q0W6LBC2n nCXptLcpARRDeW4RY4mXNXoQwRSQ+mYxuexQBFXwQZT/L5aDS9DQCFEhsnQdJHGF 7Qb+xpzmUXtWlWzjnLC1lc7fsqvC9ynk9TCTPMnZeclb/cnPkVMvz5pnBV91AV1i 9t1bx+DlpwK2LROYf1zy/l0RQLe0LHZ02DwUkY2Kn8ZHYoA2ykGWPKHOolAyVW4/ 5nmtHWzimadUrLSmlGeIa87HsUEiy3vQz3jo/GayM84reSysLARbjjVKn+M8ouQ1 UiSQ3NKN+QnArfdNCw2yigRb7KM4JWlZHVdllUwozXG22RLspkuGuVWIzMS/Y0jy yw4HBP0R5w4eKEATM+tLvFiX4+Eu13c9/ATmOwU9qcUTfAx78kgcL96E4lISLOo4 U8kY+gohUq1CtbpzvEo0d/BlnTWVDBzXzerQpTWYB+ROXqjnl7OROYq7x6iSbQy1 NzbFDYqU7Q== =2TM/ -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch master updated: [Improve] Add InitialSubscriptionName for DLQPolicy (#1264)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new dad98f16 [Improve] Add InitialSubscriptionName for DLQPolicy (#1264) dad98f16 is described below commit dad98f16886e319291b93089a47e70835f08de0f Author: crossoverJie AuthorDate: Thu Aug 1 12:04:24 2024 +0800 [Improve] Add InitialSubscriptionName for DLQPolicy (#1264) Fixes #1239 ### Modifications Add `InitialSubscriptionName` for DLQPolicy. --- pulsar/consumer.go | 6 pulsar/consumer_test.go | 85 pulsar/dlq_router.go | 1 + pulsar/producer.go | 6 pulsar/producer_partition.go | 1 + 5 files changed, 99 insertions(+) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 31f89a54..bf2eafbf 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -78,6 +78,12 @@ type DLQPolicy struct { // RetryLetterTopic specifies the name of the topic where the retry messages will be sent. RetryLetterTopic string + + // InitialSubscriptionName Name of the initial subscription name of the dead letter topic. + // If this field is not set, the initial subscription for the dead letter topic will not be created. + // If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer + // will fail to be created. + InitialSubscriptionName string } // AckGroupingOptions controls how to group ACK requests diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 2ecdf8eb..04439cbc 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1539,6 +1539,91 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { assert.Error(t, err) assert.Nil(t, msg) } +func TestDeadLetterTopicWithInitialSubscription(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/" + newTopicName() + dlqSub, sub, consumerName := "init-sub", "my-sub", "my-consumer" + dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, sub) + ctx := context.Background() + + // create consumer + maxRedeliveryCount, sendMessages := 1, 100 + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName:sub, + NackRedeliveryDelay: 1 * time.Second, + Type:Shared, + DLQ: &DLQPolicy{ + MaxDeliveries: uint32(maxRedeliveryCount), + DeadLetterTopic: dlqTopic, + InitialSubscriptionName: dlqSub, + }, + Name: consumerName, + ReceiverQueueSize: sendMessages, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send messages + for i := 0; i < sendMessages; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + log.Fatal(err) + } + } + + // nack all messages + for i := 0; i < sendMessages*(maxRedeliveryCount+1); i++ { + ctx, canc := context.WithTimeout(context.Background(), 3*time.Second) + defer canc() + msg, _ := consumer.Receive(ctx) + if msg == nil { + break + } + consumer.Nack(msg) + } + + // create dlq consumer + dlqConsumer, err := client.Subscribe(ConsumerOptions{ + Topic:dlqTopic, + SubscriptionName: dlqSub, + }) + assert.Nil(t, err) + defer dlqConsumer.Close() + + for i := 0; i < sendMessages; i++ { + ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer canc() + msg, err := dlqConsumer.Receive(ctx) + assert.Nil(t, err) + assert.NotNil(t, msg) + err = dlqConsumer.Ack(msg) + assert.Nil(t, err) + } + + // No more messages on the DLQ + ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer canc() + msg, err := dlqConsumer.Receive(ctx) + assert.Err
(pulsar-client-go) 05/06: [Issue 1259][producer] Prevent panic when calling Flush on closed producer (#1260)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 53fc9381499a0f8d88c97b10ad4b4670539cbc4d Author: Gaylor Bosson AuthorDate: Mon Jul 29 08:09:00 2024 +0200 [Issue 1259][producer] Prevent panic when calling Flush on closed producer (#1260) (cherry picked from commit fb805c0aea3506ba9df87bb066221469e35005a0) --- pulsar/producer_partition.go | 4 pulsar/producer_test.go | 15 +++ 2 files changed, 19 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index cd89862a..5c038aa5 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1494,6 +1494,10 @@ func (p *partitionProducer) Flush() error { } func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { + if p.getProducerState() != producerReady { + return ErrProducerClosed + } + flushReq := &flushRequest{ doneCh: make(chan struct{}), err:nil, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0e38d637..5b23182d 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -523,6 +523,21 @@ func TestFlushInPartitionedProducer(t *testing.T) { assert.Equal(t, msgCount, numOfMessages/2) } +func TestProducerReturnsErrorOnFlushWhenClosed(t *testing.T) { + client, err := NewClient(ClientOptions{URL: serviceURL}) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{Topic: newTopicName()}) + assert.NoError(t, err) + assert.NotNil(t, producer) + + producer.Close() + + err = producer.FlushWithCtx(context.Background()) + assert.Error(t, err) +} + func TestRoundRobinRouterPartitionedProducer(t *testing.T) { topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer" numberOfPartitions := 5
(pulsar-client-go) branch branch-0.13.0 updated (3cbb3537 -> e44bd048)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 3cbb3537 [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244) new 13a78684 fix: failTimeoutMessages cannot delete outdated messages (#1247) new 8d3177d6 fix pulsar admin revoke subscription permission endpoint (#1251) new d18918c1 ci: validate multiple version builds and use golangci-lint-action (#1250) new b6df55c3 fix: producer close was blocked (#1249) new 53fc9381 [Issue 1259][producer] Prevent panic when calling Flush on closed producer (#1260) new e44bd048 [fix] Avoid a data race when flushing with load (#1261) The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/ci.yml| 16 ++- Dockerfile | 3 + Makefile| 4 +- go.mod | 75 +--- go.sum | 205 pulsar/producer_partition.go| 104 ++-- pulsar/producer_test.go | 64 ++ pulsaradmin/pkg/admin/namespace.go | 12 +- pulsaradmin/pkg/admin/namespace_test.go | 28 + 9 files changed, 425 insertions(+), 86 deletions(-)
(pulsar-client-go) 06/06: [fix] Avoid a data race when flushing with load (#1261)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit e44bd0481cc8b3b407c257a3411a97d48bb29844 Author: Gaylor Bosson AuthorDate: Tue Jul 30 14:25:42 2024 +0200 [fix] Avoid a data race when flushing with load (#1261) Fixes #1258 ### Motivation While flushing, the data channel is switched if a new allocated one which can cause the loss of messages because the length can be zero which would stop the procedure and at the same time a new message can be sent to the channel. ### Modifications Instead of allocating a new channel, it empties the existing one up to the length of the buffer of the channel before proceeding with the flush. (cherry picked from commit 8dd4ed19b98ecf9287cbcfc20e2de9a12b197e3f) --- pulsar/producer_partition.go | 33 - 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5c038aa5..f5fd493b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { p.schemaCache.Put(p.schemaInfo, schemaVersion) } - if err != nil { - return err - } - if !p.options.DisableBatching && p.batchBuilder == nil { provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType) if err != nil { @@ -1022,15 +1018,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { - // clear all the messages which have sent to dataChan before flush - if len(p.dataChan) != 0 { - oldDataChan := p.dataChan - p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages) - for len(oldDataChan) != 0 { - pendingData := <-oldDataChan - p.internalSend(pendingData) - } - } + p.clearPendingSendRequests() if !p.options.DisableBatching { p.internalFlushCurrentBatch() @@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { } } +// clearPendingSendRequests makes sure to push forward previous sending requests +// by emptying the data channel. +func (p *partitionProducer) clearPendingSendRequests() { + sizeBeforeFlushing := len(p.dataChan) + + // Bound the for loop to the current length of the channel to ensure that it + // will eventually stop as we only want to ensure that existing messages are + // flushed. + for i := 0; i < sizeBeforeFlushing; i++ { + select { + case pendingData := <-p.dataChan: + p.internalSend(pendingData) + + default: + return + } + } +} + func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { var err error var msgID MessageID
(pulsar-client-go) 02/06: fix pulsar admin revoke subscription permission endpoint (#1251)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 8d3177d6aae08119fbc4b8b52c211cb38a228ce7 Author: 码斯克 <736090...@qq.com> AuthorDate: Mon Jul 22 18:37:00 2024 +0800 fix pulsar admin revoke subscription permission endpoint (#1251) * fix pulsar admin revoke subscription permission endpoint * feat: support get subscription permissions and add test case * feat: remove fmt package - Co-authored-by: haotao chen (cherry picked from commit df6a15c8548271f521d9615871899bd8d8678ed1) --- pulsaradmin/pkg/admin/namespace.go | 12 +++- pulsaradmin/pkg/admin/namespace_test.go | 28 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index 782ae3ae..efe458ee 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -200,6 +200,9 @@ type Namespaces interface { // RevokeSubPermission revoke permissions on a subscription's admin-api access RevokeSubPermission(namespace utils.NameSpaceName, sName, role string) error + // GetSubPermissions returns subscription permissions on a namespace + GetSubPermissions(namespace utils.NameSpaceName) (map[string][]string, error) + // SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace SetSubscriptionAuthMode(namespace utils.NameSpaceName, mode utils.SubscriptionAuthMode) error @@ -750,10 +753,17 @@ func (n *namespaces) GrantSubPermission(namespace utils.NameSpaceName, sName str func (n *namespaces) RevokeSubPermission(namespace utils.NameSpaceName, sName, role string) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", - "subscription", sName, role) + sName, role) return n.pulsar.Client.Delete(endpoint) } +func (n *namespaces) GetSubPermissions(namespace utils.NameSpaceName) (map[string][]string, error) { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "permissions", "subscription") + var permissions map[string][]string + err := n.pulsar.Client.Get(endpoint, &permissions) + return permissions, err +} + func (n *namespaces) SetSubscriptionAuthMode(namespace utils.NameSpaceName, mode utils.SubscriptionAuthMode) error { endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionAuthMode") return n.pulsar.Client.Post(endpoint, mode.String()) diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index f934a968..941f6127 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -173,3 +173,31 @@ func TestGetTopicAutoCreation(t *testing.T) { } assert.Equal(t, expected, *topicAutoCreation) } + +func TestRevokeSubPermission(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, err := utils.GetNamespaceName("public/default") + require.NoError(t, err) + require.NotNil(t, namespace) + + sub := "subscription" + roles := []string{"user"} + + // grant subscription permission and get it + err = admin.Namespaces().GrantSubPermission(*namespace, sub, roles) + require.NoError(t, err) + permissions, err := admin.Namespaces().GetSubPermissions(*namespace) + require.NoError(t, err) + assert.Equal(t, roles, permissions[sub]) + + // revoke subscription permission and get it + err = admin.Namespaces().RevokeSubPermission(*namespace, sub, roles[0]) + require.NoError(t, err) + permissions, err = admin.Namespaces().GetSubPermissions(*namespace) + require.NoError(t, err) + assert.Equal(t, 0, len(permissions[sub])) +}
(pulsar-client-go) 01/06: fix: failTimeoutMessages cannot delete outdated messages (#1247)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 13a78684ee4c6df78c852bc65539581a2caf1e36 Author: Zixuan Liu AuthorDate: Mon Jul 22 18:32:07 2024 +0800 fix: failTimeoutMessages cannot delete outdated messages (#1247) * fix: failTimeoutMessages cannot delete outdated messages * Fix slice pass (cherry picked from commit a42cc24f8ff4c00f4d2f901bf9c823b5b2c653e6) --- pulsar/producer_partition.go | 36 +++- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b2fae72..78f1f3cc 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -372,8 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { continue } pi := item.(*pendingItem) - // when resending pending batches, we update the sendAt timestamp and put to the back of queue - // to avoid pending item been removed by failTimeoutMessages and cause race condition + // when resending pending batches, we update the sendAt timestamp to record the metric. pi.Lock() pi.sentAt = time.Now() pi.Unlock() @@ -814,19 +813,14 @@ func (p *partitionProducer) internalSingleSend( return } - p.pendingQueue.Put(&pendingItem{ - sentAt: time.Now(), - buffer: buffer, - sequenceID: sid, - sendRequests: []interface{}{sr}, - }) - p._getConn().WriteData(buffer) + p.writeData(buffer, sid, []interface{}{sr}) } type pendingItem struct { sync.Mutex bufferinternal.Buffer sequenceIDuint64 + createdAt time.Time sentAttime.Time sendRequests []interface{} isDonebool @@ -868,13 +862,19 @@ func (p *partitionProducer) internalFlushCurrentBatch() { return } + p.writeData(batchData, sequenceID, callbacks) +} + +func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) { + now := time.Now() p.pendingQueue.Put(&pendingItem{ - sentAt: time.Now(), - buffer: batchData, + createdAt:now, + sentAt: now, + buffer: buffer, sequenceID: sequenceID, sendRequests: callbacks, }) - p._getConn().WriteData(batchData) + p._getConn().WriteData(buffer) } func (p *partitionProducer) failTimeoutMessages() { @@ -898,7 +898,7 @@ func (p *partitionProducer) failTimeoutMessages() { continue } oldestItem := item.(*pendingItem) - if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 { + if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 { // none of these pending messages have timed out, wait and retry t.Reset(nextWaiting) continue @@ -930,7 +930,7 @@ func (p *partitionProducer) failTimeoutMessages() { pi := m.(*pendingItem) pi.Lock() defer pi.Unlock() - if nextWaiting := diff(pi.sentAt); nextWaiting > 0 { + if nextWaiting := diff(pi.createdAt); nextWaiting > 0 { // current and subsequent items not timeout yet, stop iterating tickerNeedWaiting = nextWaiting return false @@ -995,13 +995,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { if b.BatchData == nil { continue } - p.pendingQueue.Put(&pendingItem{ - sentAt: time.Now(), - buffer: b.BatchData, - sequenceID: b.SequenceID, - sendRequests: b.Callbacks, - }) - p._getConn().WriteData(b.BatchData) + p.writeData(b.BatchData, b.SequenceID, b.Callbacks) } }
(pulsar-client-go) 03/06: ci: validate multiple version builds and use golangci-lint-action (#1250)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit d18918c10f17ba5e044c5bc38f614ce0d3b27d41 Author: Zixuan Liu AuthorDate: Sun Jul 14 12:03:08 2024 +0800 ci: validate multiple version builds and use golangci-lint-action (#1250) * ci: validate multiple version builds and use golangci-lint-actio * Fix lint (cherry picked from commit 2ff2c2cdafe71d72a5d3b69680f3086301510f4b) --- .github/workflows/ci.yml | 16 +--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a6731f9..e2eb9fa0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,25 +20,35 @@ on: [pull_request] jobs: build: runs-on: ubuntu-latest +strategy: + matrix: +go-version: [ '1.20', '1.21', '1.22' ] steps: - uses: actions/checkout@v3 - - uses: actions/setup-go@v3 + - uses: actions/setup-go@v5 +with: + go-version: ${{ matrix.go-version }} - run: make build lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - uses: actions/setup-go@v5 +with: + go-version: '1.20' - name: Check license header run: docker run --rm -v $(pwd):/github/workspace ghcr.io/korandoru/hawkeye-native:v3 check - name: Run golangci-lint -run: make lint +uses: golangci/golangci-lint-action@v6 +with: + version: v1.51.2 integration-tests: runs-on: ubuntu-latest strategy: matrix: -go-version: ['1.20', '1.21.0', '1.22.0'] +go-version: [ '1.20', '1.21', '1.22' ] steps: - uses: actions/checkout@v3 - name: clean docker cache
(pulsar-client-go) branch master updated: [fix] Avoid a data race when flushing with load (#1261)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 8dd4ed19 [fix] Avoid a data race when flushing with load (#1261) 8dd4ed19 is described below commit 8dd4ed19b98ecf9287cbcfc20e2de9a12b197e3f Author: Gaylor Bosson AuthorDate: Tue Jul 30 14:25:42 2024 +0200 [fix] Avoid a data race when flushing with load (#1261) Fixes #1258 ### Motivation While flushing, the data channel is switched if a new allocated one which can cause the loss of messages because the length can be zero which would stop the procedure and at the same time a new message can be sent to the channel. ### Modifications Instead of allocating a new channel, it empties the existing one up to the length of the buffer of the channel before proceeding with the flush. --- pulsar/producer_partition.go | 33 - 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5c038aa5..f5fd493b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { p.schemaCache.Put(p.schemaInfo, schemaVersion) } - if err != nil { - return err - } - if !p.options.DisableBatching && p.batchBuilder == nil { provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType) if err != nil { @@ -1022,15 +1018,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { - // clear all the messages which have sent to dataChan before flush - if len(p.dataChan) != 0 { - oldDataChan := p.dataChan - p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages) - for len(oldDataChan) != 0 { - pendingData := <-oldDataChan - p.internalSend(pendingData) - } - } + p.clearPendingSendRequests() if !p.options.DisableBatching { p.internalFlushCurrentBatch() @@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { } } +// clearPendingSendRequests makes sure to push forward previous sending requests +// by emptying the data channel. +func (p *partitionProducer) clearPendingSendRequests() { + sizeBeforeFlushing := len(p.dataChan) + + // Bound the for loop to the current length of the channel to ensure that it + // will eventually stop as we only want to ensure that existing messages are + // flushed. + for i := 0; i < sizeBeforeFlushing; i++ { + select { + case pendingData := <-p.dataChan: + p.internalSend(pendingData) + + default: + return + } + } +} + func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { var err error var msgID MessageID
(pulsar-client-go) branch master updated: [feat] Add support for subscription expiration time namespace settings (#1254)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c74460dd [feat] Add support for subscription expiration time namespace settings (#1254) c74460dd is described below commit c74460ddc43a0bd48beea527926f7c74f7ddf28a Author: Kai AuthorDate: Fri Jul 26 20:52:05 2024 -0700 [feat] Add support for subscription expiration time namespace settings (#1254) Fixes #1253 ### Motivation Adds support for the get / set / delete `subscriptionExpirationTime` namespace settings. ### Modifications Created new functions in `namespace.go` that implement support for `subscriptionExpirationTime` settings --- pulsaradmin/pkg/admin/namespace.go | 29 + pulsaradmin/pkg/admin/namespace_test.go | 76 + 2 files changed, 105 insertions(+) diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index efe458ee..57c0297a 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -279,6 +279,16 @@ type Namespaces interface { // SetInactiveTopicPolicies sets the inactive topic policies on a namespace SetInactiveTopicPolicies(namespace utils.NameSpaceName, data utils.InactiveTopicPolicies) error + + // GetSubscriptionExpirationTime gets the subscription expiration time on a namespace. Returns -1 if not set + GetSubscriptionExpirationTime(namespace utils.NameSpaceName) (int, error) + + // SetSubscriptionExpirationTime sets the subscription expiration time on a namespace + SetSubscriptionExpirationTime(namespace utils.NameSpaceName, expirationTimeInMinutes int) error + + // RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace, + // defaulting to broker settings + RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error } type namespaces struct { @@ -893,3 +903,22 @@ func (n *namespaces) SetInactiveTopicPolicies(namespace utils.NameSpaceName, dat endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "inactiveTopicPolicies") return n.pulsar.Client.Post(endpoint, data) } + +func (n *namespaces) GetSubscriptionExpirationTime(namespace utils.NameSpaceName) (int, error) { + var result = -1 + + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") + err := n.pulsar.Client.Get(endpoint, &result) + return result, err +} + +func (n *namespaces) SetSubscriptionExpirationTime(namespace utils.NameSpaceName, + subscriptionExpirationTimeInMinutes int) error { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") + return n.pulsar.Client.Post(endpoint, &subscriptionExpirationTimeInMinutes) +} + +func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") + return n.pulsar.Client.Delete(endpoint) +} diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index 941f6127..0b4000ec 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -201,3 +201,79 @@ func TestRevokeSubPermission(t *testing.T) { require.NoError(t, err) assert.Equal(t, 0, len(permissions[sub])) } + +func TestNamespaces_SetSubscriptionExpirationTime(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + tests := []struct { + name string + namespace string + subscriptionExpirationTime int + errReason string + }{ + { + name: "Set valid subscription expiration time", + namespace: "public/default", + subscriptionExpirationTime: 60, + errReason: "", + }, + { + name: "Set invalid subscription expiration time", + namespace: "public/default", + subscriptionExpirationTime: -60, + errReason: "Invalid value for subscription expiration time", + }, + { + name: &
(pulsar-client-go) branch branch-0.13.0 updated: [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.13.0 by this push: new 3cbb3537 [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244) 3cbb3537 is described below commit 3cbb3537598bba9b777b4488916c156de372e354 Author: Nikolaj Lund Sørensen AuthorDate: Wed Jul 24 12:51:19 2024 +0200 [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244) ### Motivation As the issue shows when using Pulsarctl which works with a context as configuration style, Oauth2 is used under the hood. However the library does not expose any way to inject the context configuration `scope` value, and then relies on reading that from the `keyFile` for `client_credentials flow`. However that is not being utilized in the current code as the scope value is not read from from the file. *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* To allow the usage of oauth2 with a keyfile in Pulsarctl 3+ which it doesn't right now ### Modifications Alters so reading the keyFile for client credentials also returns the `scope` value. The value is then split on spaces, and added to a temporary slice. After that the value of any additionalScopes that was already in the options is then added to the temp slice as well. In the end the additionalScopes property on the options is set to the temp slice containing values from previous additionalScopes and the ones from the keyFile. (cherry picked from commit c3b0633a2f5fcc5b560b504d04fdef0e82f6b72a) --- oauth2/client_credentials_flow.go | 18 +++--- oauth2/client_credentials_flow_test.go | 6 +- oauth2/client_credentials_provider.go | 1 + oauth2/client_credentials_provider_test.go | 2 ++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/oauth2/client_credentials_flow.go b/oauth2/client_credentials_flow.go index 2252144a..9a2643a9 100644 --- a/oauth2/client_credentials_flow.go +++ b/oauth2/client_credentials_flow.go @@ -20,6 +20,8 @@ package oauth2 import ( "net/http" + "strings" + "github.com/apache/pulsar-client-go/oauth2/clock" "github.com/pkg/errors" @@ -68,7 +70,6 @@ func newClientCredentialsFlow( // NewDefaultClientCredentialsFlow provides an easy way to build up a default // client credentials flow with all the correct configuration. func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) (*ClientCredentialsFlow, error) { - credsProvider := NewClientCredentialsProviderFromKeyFile(options.KeyFile) keyFile, err := credsProvider.GetClientCredentials() if err != nil { @@ -81,7 +82,6 @@ func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) (*Cli } tokenRetriever := NewTokenRetriever(&http.Client{}) - return newClientCredentialsFlow( options, keyFile, @@ -94,13 +94,25 @@ var _ Flow = &ClientCredentialsFlow{} func (c *ClientCredentialsFlow) Authorize(audience string) (*AuthorizationGrant, error) { var err error + + // Merge the scopes of the options AdditionalScopes with the scopes read from the keyFile config + var scopesToAdd []string + if len(c.options.AdditionalScopes) > 0 { + scopesToAdd = append(scopesToAdd, c.options.AdditionalScopes...) + } + + if c.keyfile.Scope != "" { + scopesSplit := strings.Split(c.keyfile.Scope, " ") + scopesToAdd = append(scopesToAdd, scopesSplit...) + } + grant := &AuthorizationGrant{ Type: GrantTypeClientCredentials, Audience: audience, ClientID: c.keyfile.ClientID, ClientCredentials: c.keyfile, TokenEndpoint: c.oidcWellKnownEndpoints.TokenEndpoint, - Scopes:c.options.AdditionalScopes, + Scopes:scopesToAdd, } // test the credentials and obtain an initial access token diff --git a/oauth2/client_credentials_flow_test.go b/oauth2/client_credentials_flow_test.go index 987ae73d..6d9db354 100644 --- a/oauth2/client_credentials_flow_test.go +++ b/oauth2/client_credentials_flow_test.go @@ -47,6 +47,7 @@ var clientCredentials = KeyFile{ ClientSecret: "test_clientSecret", ClientEmail: "test_clientEmail", IssuerURL:"http://issuer";, + Scope:"test_scope", } var _ = Describe("ClientCredentialsFlow", func()
(pulsar-client-go) branch master updated: [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c3b0633a [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244) c3b0633a is described below commit c3b0633a2f5fcc5b560b504d04fdef0e82f6b72a Author: Nikolaj Lund Sørensen AuthorDate: Wed Jul 24 12:51:19 2024 +0200 [Fix] Oauth2 Client credentials flow use scopes from the keyfile as well (#1244) ### Motivation As the issue shows when using Pulsarctl which works with a context as configuration style, Oauth2 is used under the hood. However the library does not expose any way to inject the context configuration `scope` value, and then relies on reading that from the `keyFile` for `client_credentials flow`. However that is not being utilized in the current code as the scope value is not read from from the file. *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* To allow the usage of oauth2 with a keyfile in Pulsarctl 3+ which it doesn't right now ### Modifications Alters so reading the keyFile for client credentials also returns the `scope` value. The value is then split on spaces, and added to a temporary slice. After that the value of any additionalScopes that was already in the options is then added to the temp slice as well. In the end the additionalScopes property on the options is set to the temp slice containing values from previous additionalScopes and the ones from the keyFile. --- oauth2/client_credentials_flow.go | 18 +++--- oauth2/client_credentials_flow_test.go | 6 +- oauth2/client_credentials_provider.go | 1 + oauth2/client_credentials_provider_test.go | 2 ++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/oauth2/client_credentials_flow.go b/oauth2/client_credentials_flow.go index 2252144a..9a2643a9 100644 --- a/oauth2/client_credentials_flow.go +++ b/oauth2/client_credentials_flow.go @@ -20,6 +20,8 @@ package oauth2 import ( "net/http" + "strings" + "github.com/apache/pulsar-client-go/oauth2/clock" "github.com/pkg/errors" @@ -68,7 +70,6 @@ func newClientCredentialsFlow( // NewDefaultClientCredentialsFlow provides an easy way to build up a default // client credentials flow with all the correct configuration. func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) (*ClientCredentialsFlow, error) { - credsProvider := NewClientCredentialsProviderFromKeyFile(options.KeyFile) keyFile, err := credsProvider.GetClientCredentials() if err != nil { @@ -81,7 +82,6 @@ func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) (*Cli } tokenRetriever := NewTokenRetriever(&http.Client{}) - return newClientCredentialsFlow( options, keyFile, @@ -94,13 +94,25 @@ var _ Flow = &ClientCredentialsFlow{} func (c *ClientCredentialsFlow) Authorize(audience string) (*AuthorizationGrant, error) { var err error + + // Merge the scopes of the options AdditionalScopes with the scopes read from the keyFile config + var scopesToAdd []string + if len(c.options.AdditionalScopes) > 0 { + scopesToAdd = append(scopesToAdd, c.options.AdditionalScopes...) + } + + if c.keyfile.Scope != "" { + scopesSplit := strings.Split(c.keyfile.Scope, " ") + scopesToAdd = append(scopesToAdd, scopesSplit...) + } + grant := &AuthorizationGrant{ Type: GrantTypeClientCredentials, Audience: audience, ClientID: c.keyfile.ClientID, ClientCredentials: c.keyfile, TokenEndpoint: c.oidcWellKnownEndpoints.TokenEndpoint, - Scopes:c.options.AdditionalScopes, + Scopes:scopesToAdd, } // test the credentials and obtain an initial access token diff --git a/oauth2/client_credentials_flow_test.go b/oauth2/client_credentials_flow_test.go index 987ae73d..6d9db354 100644 --- a/oauth2/client_credentials_flow_test.go +++ b/oauth2/client_credentials_flow_test.go @@ -47,6 +47,7 @@ var clientCredentials = KeyFile{ ClientSecret: "test_clientSecret", ClientEmail: "test_clientEmail", IssuerURL:"http://issuer";, + Scope:"test_scope", } var _ = Describe("ClientCredentialsFlow", func() { @@ -64,9 +65,11 @@ var _ = Describe("ClientCredentialsFlow", func() {
(pulsar-client-go) branch branch-0.13.0 updated: [Fix] Fix the key-based batch can't guarantee the ordering when flushing (#1252)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.13.0 by this push: new 73fd3743 [Fix] Fix the key-based batch can't guarantee the ordering when flushing (#1252) 73fd3743 is described below commit 73fd3743428e8455a8ad4ec2ba4db7a2a1f03822 Author: Zike Yang AuthorDate: Fri Jul 19 21:00:43 2024 +0800 [Fix] Fix the key-based batch can't guarantee the ordering when flushing (#1252) ### Motivation When using key-based producing, the key-based batcher cannot guarantee the sequence ID order when flushing messages. This leads to frequent disconnections between the client and the broker, resulting in poor performance. Here is the related log: ``` time="2024-07-19T03:38:07Z" level=warning msg="Received send error from server: [PersistenceError] : [Cannot determine whether the message is a duplicate at this time]" time="2024-07-19T03:38:07Z" level=warning msg="Connection was closed" cnx="xxx" producerID=1 producer_name=xxx topic="xxx" time="2024-07-19T03:38:07Z" level=warning msg="Failed to write on connection" error="use of closed network connection" local_addr="xxx" remote_addr="xxx" ``` The broker receives unordered messages and returns a SendError to the client. This causes the client to reconnect to the broker. ### Modifications - Refactor the BatchBuilder interface. - Sort batches before flushing in the key-based batcher builder. (cherry picked from commit e2fd4c9737794851bb741f717264961b28da3812) --- pulsar/internal/batch_builder.go| 36 ++- pulsar/internal/key_based_batch_builder.go | 46 +- pulsar/internal/key_based_batch_builder_test.go | 80 + pulsar/producer_partition.go| 31 +- 4 files changed, 133 insertions(+), 60 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 6df3a618..1074be82 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -57,13 +57,11 @@ type BatchBuilder interface { ) bool // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. - Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error) + Flush() *FlushBatch - // Flush all the messages buffered in multiple batches and wait until all + // FlushBatches all the messages buffered in multiple batches and wait until all // messages have been successfully persisted. - FlushBatches() ( - batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error, - ) + FlushBatches() []*FlushBatch // Return the batch container batch message in multiple batches. IsMultiBatches() bool @@ -72,6 +70,13 @@ type BatchBuilder interface { Close() error } +type FlushBatch struct { + BatchData Buffer + SequenceID uint64 + Callbacks []interface{} + Error error +} + // batchContainer wraps the objects needed to a batch. // batchContainer implement BatchBuilder as a single batch container. type batchContainer struct { @@ -250,12 +255,10 @@ func (bc *batchContainer) reset() { } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. -func (bc *batchContainer) Flush() ( - batchData Buffer, sequenceID uint64, callbacks []interface{}, err error, -) { +func (bc *batchContainer) Flush() *FlushBatch { if bc.numMessages == 0 { // No-Op for empty batch - return nil, 0, nil, nil + return nil } bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages) @@ -271,6 +274,8 @@ func (bc *batchContainer) Flush() ( buffer = NewBuffer(int(uncompressedSize * 3 / 2)) } + sequenceID := uint64(0) + var err error if err = serializeMessage( buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor, bc.maxMessageSize, true, @@ -278,15 +283,18 @@ func (bc *batchContainer) Flush() ( sequenceID = bc.cmdSend.Send.GetSequenceId() } - callbacks = bc.callbacks + callbacks := bc.callbacks bc.reset() - return buffer, sequenceID, callbacks, err + return &FlushBatch{ + BatchData: buffer, + SequenceID: sequenceID, + Callbacks: callbacks, + Error: err, + } } // FlushBatches only
(pulsar-client-go) branch master updated (e2fd4c97 -> 0ac542cb)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from e2fd4c97 [Fix] Fix the key-based batch can't guarantee the ordering when flushing (#1252) add 0ac542cb Add changelog for 0.13.0 (#1245) No new revisions were added by this update. Summary of changes: CHANGELOG.md | 68 1 file changed, 68 insertions(+)
svn commit: r70463 - /release/pulsar/pulsar-client-go-0.12.1/
Author: zike Date: Mon Jul 22 03:40:31 2024 New Revision: 70463 Log: Remove old release pulsar-client-go-0.12.1 Removed: release/pulsar/pulsar-client-go-0.12.1/
svn commit: r70462 - /release/pulsar/pulsar-client-go-0.11.1/
Author: zike Date: Mon Jul 22 03:40:11 2024 New Revision: 70462 Log: Remove old release pulsar-client-go-0.11.1 Removed: release/pulsar/pulsar-client-go-0.11.1/
svn commit: r70461 - /dev/pulsar/pulsar-client-go-0.13.0-candidate-2/ /release/pulsar/pulsar-client-go-0.13.0/
Author: zike Date: Mon Jul 22 03:37:44 2024 New Revision: 70461 Log: Release go client 0.13.0 Added: release/pulsar/pulsar-client-go-0.13.0/ - copied from r70460, dev/pulsar/pulsar-client-go-0.13.0-candidate-2/ Removed: dev/pulsar/pulsar-client-go-0.13.0-candidate-2/
(pulsar-client-go) annotated tag v0.13.0 updated (81eca356 -> 6682bcb8)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.13.0 was modified! *** from 81eca356 (commit) to 6682bcb8 (tag) tagging 81eca356ec1486199e95ea8f4def005cc6b6e83d (commit) replaces v0.13.0-candidate-1 by Zike Yang on Mon Jul 22 10:18:37 2024 +0800 - Log - Release v0.13.0 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmadwX0QHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VfQrD/4j1+F4/LtAYct8qpxPg73wpBpdTQqjHcRb Lub0c2gURVAboS/Ktk+Lz+RKNv28gNr07FcvVDO6pG5KE1g5dYuXf+Pcrb21TWpK ba28HKzUwfxP0n92w71ZEO788w5nJlCxf/vnu8TCCZMsNBva3mcktfp3NoPCJExy 3PYY5Xkkuyn9qn9gbNsgtPAHPedg4wAprYPWlmM91GKBTl2eEOUKdAZ7TaINdxXg N4kaXPrt9TrFaaYzAYpW71gUXMW9f5QhV/gvfJUk6Qg45lN3SXvhNwxnO/yhLt2I PJMMG+irt2RjDT0EcvNlXtO3ISHbajA+/9IyuKprTEFKvQFgDtDZKtjJ3nCeUjUQ EC//Us4hr4wZCV7LS99RKFfMYQL9ohdbu5HUTrp6CLsYGQPmVh880/N6im71Zpa1 wMIdYQKOhtry7BS/EDQGzDg3phIWYQtnnfhf/ZMRed+DZF91y65vJN6W/xfsR1js 0V4h0v9dFaDW+4FZQA3IK28TL2hJqvf9Z2itnz8SHxw9yuy/chelp+dEOip8XdA9 38ygjtRYhr/dZkBCV9vknugVI95pOKNURWdxRitK3XSps4xBx72vvFTqp1MDn4LY 4OAzD1kIzkmwahEm9zisO8+16OqZsthJXEhNyaKG5qGCjzRk+1uNae/9XPoGpF9K Hewp5oGqSQ== =6QR9 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r70308 - in /dev/pulsar/pulsar-client-go-0.13.0-candidate-2: ./ apache-pulsar-client-go-0.13.0-src.tar.gz apache-pulsar-client-go-0.13.0-src.tar.gz.asc apache-pulsar-client-go-0.13.0-src.t
Author: zike Date: Mon Jul 15 11:10:49 2024 New Revision: 70308 Log: Staging artifacts and signature for Pulsar Client Go release 0.13.0-candidate-2 Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-2/ dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.asc Mon Jul 15 11:10:49 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmaVA2UACgkQT0AbyNP5 ++1WKhRAAsgC5gDWYeS9KEpZhn37A6keO4yluFtqrDJVG0T9H9kX/H0NswFVtN2yd +NPmYKMgEffRY5zXddd+i8BFgG76/kMF/Nd32OeltErwiXNQ523WITCphtKR0JF5H +I6IaueoEHyeAuT+moY53Adc0Si0TUAFctk1fcCSVOTDAnF+AJvvB0dq3xYlmuoFm +KlWJpnfu10vf4JR9inmfyOCl4FBdSpu7y6+PKCtV62l1ErXDzqZOj2xFaqACt3vg +VRgakDWvvZnsxlCqrRJL6hXlx1ap30wpxtT16Iry5a669Tiac+YbO3g4Gmw1jDtM +GYV35OGzuPoExUToR68JXPd9EFhpQpNREruzsZC+9Fxi1IJHbai+IhMsA8Uv89g/ +QIdmu6su/w+Aubt6Nv97JgHqlawB9l3gK/o2cWn3wLfkpoqD/vFwsqRTERjWa19d +I2eZ6wyTPjaUww4G0ivz39YUKr+Ozqo54BjqEjHqRSvahHCj9Vwolo8YY3Lj3nO0 +RboM77Vp1c1G4SDwhXvxRB8bbQVD6lsKpPXxeEiy4+eHf+JTRfHGikL1bZXwt+ge +iUBBjp/aRnQpjKGx6cCL3tdkqjMcmv8yxJs/ajtf0GHoj9TIdhTXeoTr7iUnXxrd +tvvZ7Ml8vqZ5AeTPefSCMLG/bW7H/Li85ukFS+yclqIhbAyhP0I= +=cO1l +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.13.0-candidate-2/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 Mon Jul 15 11:10:49 2024 @@ -0,0 +1 @@ +9a52fb25cbb6d86651a0d56c1d6e17826810b91f3ba6299f632630f5b5a1d85c6a0842e36aaa2da6fc50d4e9406fe6d7b557368f9d99d876345c987f51d554fb apache-pulsar-client-go-0.13.0-src.tar.gz
(pulsar-client-go) annotated tag v0.13.0-candidate-2 updated (81eca356 -> 3bbe5c72)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.13.0-candidate-2 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.13.0-candidate-2 was modified! *** from 81eca356 (commit) to 3bbe5c72 (tag) tagging 81eca356ec1486199e95ea8f4def005cc6b6e83d (commit) replaces v0.13.0-candidate-1 by Zike Yang on Mon Jul 15 18:55:30 2024 +0800 - Log - Release v0.13.0-candidate-2 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmaVACIQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VX35D/9X76qP0V//qQLqHSUvcRq1n/2oiltA5ACM EoyoP2TYwtw7F+13yXKxpLtfMts5Flz2eH/bFMfZ4Om/lA6J8HkH4CGZ6Fl0JcfS Gj0XHkuFCyihJGaNamxTRONus1aLISmst/GzpWmnXXgg7NpqpdKpEY2IbDMv1fTv xn41Z4V/Dk+rXpOUu3zFJ6vqePuFtdjMFF23GcS8Pkb2x1QVhWnWiX/4cZmwXdGg pqVr65Bh0eYw+jaansG2BjywSgyUGmJWNnMndOySIqHsfOVlQaRHrgl4tapQv11b BvkekXNHcj6Hwu9MBPLLQyajrSGhw9WrX2OUyKAqgJp6HW+WRXMxY3Y0Kg6lhJ7k qHXJ4tywGVMiAjFtK7hg1BfO1+Q4wK0A7hrTaLYSya7KTeOiFFBIyyjQ8T/XTMzE olmVVzVi2pAji6fYI5gYUzCTlJUtS9pASqy4O1h2JWASUjvZtFYgpE6YhfJjImYd ycofCRUrGZ4Z4H0j1wXg64qoUaxHR7hdTxEIm+st83+FJ3QFvjdl9yO/P/aIzIOt 9eC1xsum0Sy7dVJ8Rxx399/Y4tuOU2cMgS5gPu6M27D3rKMMJoM6Mxkk+rV+ArZs XoqNpYQKIWCQbmog8ZuhSaxV7HEN3qTWt0U6Ie1QMv/jM6/s1iizw23GxFsfDDzR QUQmQ97vHg== =Y5r0 -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch branch-0.13.0 updated: fix: fix producer connection (#1243)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.13.0 by this push: new 81eca356 fix: fix producer connection (#1243) 81eca356 is described below commit 81eca356ec1486199e95ea8f4def005cc6b6e83d Author: Zixuan Liu AuthorDate: Fri Jul 12 19:56:22 2024 +0800 fix: fix producer connection (#1243) * fix: fix producer connection * Fix test * Fix nil pointer * Fix GetConnection err * Fix cnx (cherry picked from commit 29f2779123c6979c95b6235f16e6fbde33ca681f) --- pulsar/internal/rpc_client.go | 32 pulsar/producer_partition.go | 20 pulsar/producer_test.go | 32 3 files changed, 52 insertions(+), 32 deletions(-) diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index fc923196..d2e3895e 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -150,12 +150,18 @@ func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { - c.metrics.RPCRequestCount.Inc() cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr) if err != nil { return nil, err } + return c.RequestOnCnx(cnx, requestID, cmdType, message) +} + +func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, + message proto.Message) (*RPCResult, error) { + c.metrics.RPCRequestCount.Inc() + ch := make(chan result, 1) cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { @@ -171,7 +177,7 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request case res := <-ch: // Ignoring producer not ready response. // Continue to wait for the producer to create successfully - if res.error == nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS { + if res.error == nil && res.Response != nil && *res.RPCResult.Response.Type == pb.BaseCommand_PRODUCER_SUCCESS { if !res.RPCResult.Response.ProducerSuccess.GetProducerReady() { timeoutCh = nil break @@ -184,28 +190,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request } } -func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, - message proto.Message) (*RPCResult, error) { - c.metrics.RPCRequestCount.Inc() - - ch := make(chan result, 1) - - cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) { - ch <- result{&RPCResult{ - Cnx: cnx, - Response: response, - }, err} - close(ch) - }) - - select { - case res := <-ch: - return res.RPCResult, res.error - case <-time.After(c.requestTimeout): - return nil, ErrRequestTimeOut - } -} - func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error { c.metrics.RPCRequestCount.Inc() return cnx.SendRequestNoWait(baseCommand(cmdType, message)) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index f3749bc6..e0981303 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -284,20 +284,24 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr) // registering the producer first in case broker sends commands in the middle - if err == nil { - p._setConn(cnx) - err = p._getConn().RegisterListener(p.producerID, p) - if err != nil { - p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID) - } + if err != nil { + p.log.Error("Failed to get connection") + return err + } + + p._setConn(cnx) + err = p._getConn().RegisterListener(p.producerID, p) + if err != nil { + p.log.WithError(err).Errorf("Failed to register listener: {%d}", p.producerID) } - res, err := p.client.rpcClient
(pulsar-client-go) branch master updated: ci: add merge limitation (#1248)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 627999c2 ci: add merge limitation (#1248) 627999c2 is described below commit 627999c2c93a6e7ce8493e7e459d0611466b8f5f Author: Zixuan Liu AuthorDate: Mon Jul 15 15:33:56 2024 +0800 ci: add merge limitation (#1248) ### Motivation Improve PR merge. ### Modifications Add `required_pull_request_reviews` and `required_conversation_resolution` to the `.asf.yaml`. --- .asf.yaml | 8 1 file changed, 8 insertions(+) diff --git a/.asf.yaml b/.asf.yaml index 246ec818..9f1252e1 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -42,6 +42,14 @@ github: # disable rebase button: rebase: false + protected_branches: +master: + required_pull_request_reviews: +require_code_owner_reviews: true +required_approving_review_count: 1 + + # Requires all conversations on code to be resolved before a pull request can be merged. + required_conversation_resolution: true notifications: commits: commits@pulsar.apache.org
svn commit: r70244 - in /dev/pulsar/pulsar-client-go-0.13.0-candidate-1: ./ apache-pulsar-client-go-0.13.0-src.tar.gz apache-pulsar-client-go-0.13.0-src.tar.gz.asc apache-pulsar-client-go-0.13.0-src.t
Author: zike Date: Fri Jul 12 01:55:54 2024 New Revision: 70244 Log: Staging artifacts and signature for Pulsar Client Go release 0.13.0-candidate-1 Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-1/ dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.asc Fri Jul 12 01:55:54 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmaQjFsACgkQT0AbyNP5 ++1WesBAAqpaswuzfkXNBrdOhF21YGM+fWjl1N/Ft4ehI2VaSrRD/aMPptJxQVkxw +gi5xhR67T7t/uEitpQEl77x4neUR/63BTrKphwMyXhId2MrnDFHGTCl0B9iieB+3 +rIRPMtArKkCIN6wNITnxLkHTEVPFqnee7tFiHQObzrwLHBF+hGKqhMiRHVAulvh/ +NaqtMtM7Cp4Ot2HUOAUaQIK30zFqCplf9VosecG0j9HbIueArv+wPegU/whFWzka +ctb3oalcCFekjksDMJogbY+JQHhUFPua767W8GjXTlyGAnN0CJg9H13y0B2tVyG2 +D3z3gh3fHP+OQe3vonzHukcdG/t+yR1W7lig0pEsal/ItViKDNp2i6d7Vo3Li/IQ +URZtNOyvmiMpnCbOhYf2ahQKRn5CAqdlQLTkAI2TO/N3JbH2GeiiGzqPbemTyI3s +OIS7jIugvn5C2fUCQLkyFgiUuFKSpX5jbgRRxHi48itUHUcifrTv42geApH1pSdR +19CJsMmmEz2wleT/OkkuI/z+R1yEUpIRPBZkXAdPILuBHNoOyFy1Ovlr+aZkN++F +gX6OyH75tSrG+auqVpgqcB2B0LNC5kvO/OE9qYwsqJTgNIJB5nLHIbgOs2L0XhlM +r3ouHX4rNJTN7UJMX2kZkBkzcreDjeVZRiUPlaaoz1hfI3sVD34= +=uC/h +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.13.0-candidate-1/apache-pulsar-client-go-0.13.0-src.tar.gz.sha512 Fri Jul 12 01:55:54 2024 @@ -0,0 +1 @@ +05cf45eea4e2543763f4de281530df6377dbfd014ea57d043d944cdb09ea84ffbbbdda7ce86ce892f9ae0f88174179684d94fde29a91ca2d286c3c6c3e248545 apache-pulsar-client-go-0.13.0-src.tar.gz
(pulsar-client-go) annotated tag v0.13.0-candidate-1 updated (50dce7ea -> 83982253)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.13.0-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.13.0-candidate-1 was modified! *** from 50dce7ea (commit) to 83982253 (tag) tagging 50dce7ea7c2d7aff64c65ccbc2ef696a25cc9afc (commit) replaces v0.12.0-candidate-2 by Zike Yang on Fri Jul 12 09:51:57 2024 +0800 - Log - Release v0.13.0-candidate-1 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmaQjD0QHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VfiZD/0eeiWW6n6DbE8Da5WwYi5ZRj3c4nHU7ACu xUtUKpF9pGhOfSkJqLpJCc4pHPFgTceNVtBvuM8lrbjjuTajdtUdEjfaSsXkL3bO yedXPAdfvwctezqYwiXJ7bRXyJhB+pIwfuMbyLmqt3D7gZmOPNG4iYAc0+UemNog 6/BYFFHYXNo6XZU2pnt/RcjGpEZWZj/3p1fCp094giReYn/UoBW8WXfyFvjYE7EG lgaupr2ZTGbAicdsr2QqmRieBT6XOP6DCYqXhqHEnBGd6db0AcwVgudR+gvXjg4v M0vjmKuH1vHT4ayv8PSF42en1gYiL3Sx6M11t0EyasVk71jDVFzs/4Qyjy0hr2hp 6gPSIYVb5tOxfw6oQNd1noHTZv/cUFjBhSr6iA4RTron8YgUWP7zYLUcjsgCXiVX 7lqYHMBrMQQhqT/iI7Vnk8iR2jnrkjbYlQEtuRXfyPjN/hcEceXvtbU8Z0fHhzSW jhPSVMxu+ahhctHGC5fa0G8guGh/QzZo4AFqq2sdF5zto6+pNG1ucqPA1lujC/Kd jlS8JxFLFCxSi6PyVVVwFJUrSUMSVOE9v1gqPCU9ejXzGIaoNT/Lot2GaDhrQNy2 LWOANpxIuYCtUy0XcvdpIM/+GXl/CmR/ZtcEwwwfPoTeAoVYfbJMKWIewiFqcCkx QMkbdvbEZw== =9COw -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch branch-0.13.0 created (now 50dce7ea)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git at 50dce7ea Fix transaction coordinator client cann't reconnect to the broker (#1237) No new revisions were added by this update.
(pulsar-client-go) branch master updated: [improve] return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client (#1242)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new e7a771fe [improve] return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client (#1242) e7a771fe is described below commit e7a771fe8bab8b2790620848e18feabcde1056e7 Author: Zike Yang AuthorDate: Thu Jul 11 16:08:05 2024 +0800 [improve] return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client (#1242) ### Motivation Currently, the client will return an UnknownError when there are too many concurrent ops in transaction coordinator client ### Modifications - Add new error `ErrMaxConcurrentOpsReached` - Return `ErrMaxConcurrentOpsReached` when too many concurrent ops in transaction coordinator client --- pulsar/error.go | 4 pulsar/transaction_coordinator_client.go | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar/error.go b/pulsar/error.go index ccadb724..1a9345b0 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -121,6 +121,10 @@ const ( // fenced. Applications are now supposed to close it and create a // new producer ProducerFenced + // MaxConcurrentOperationsReached indicates that the maximum number of concurrent operations + // has been reached. This means that no additional operations can be started until some + // of the current operations complete. + MaxConcurrentOperationsReached // TransactionCoordinatorNotEnabled indicates that the transaction coordinator is not enabled. // This error is returned when an operation that requires the transaction coordinator is attempted // but the transaction coordinator feature is not enabled in the system or the transaction coordinator diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index d402d7ec..c4b7a6a2 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -41,6 +41,7 @@ type transactionCoordinatorClient struct { // where the TC located. const TransactionCoordinatorAssign = "persistent://pulsar/system/transaction_coordinator_assign" +var ErrMaxConcurrentOpsReached = newError(MaxConcurrentOperationsReached, "Max concurrent operations reached") var ErrTransactionCoordinatorNotEnabled = newError(TransactionCoordinatorNotEnabled, "The broker doesn't enable "+ "the transaction coordinator, or the transaction coordinator has not initialized") @@ -212,7 +213,7 @@ func getTCAssignTopicName(partition uint64) string { func (tc *transactionCoordinatorClient) canSendRequest() error { if !tc.semaphore.Acquire(context.Background()) { - return newError(UnknownError, "Failed to acquire semaphore") + return ErrMaxConcurrentOpsReached } return nil }
(pulsar-client-go) branch master updated: [feat] added a slog wrapper of the logger interface (#1234)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 1152cfc2 [feat] added a slog wrapper of the logger interface (#1234) 1152cfc2 is described below commit 1152cfc2e3c2024117ecff93a46d90e7eab0fa15 Author: Ivan Penchev <30929349+ivan-penc...@users.noreply.github.com> AuthorDate: Fri Jul 5 11:55:40 2024 +0200 [feat] added a slog wrapper of the logger interface (#1234) ### Motivation This commit supports to use `log/slog` package from the standard library to control the level and output type of the logs. In order for us to not have to import logrus as a direct dependency for part of our testing suit, it would be nice if we can use `slog` package instead, and wrap that in the provided by `pulsar/log` interfaces. This ties in a bit with issue #1078 because it opens the door for users who are already working with log/slog in their projects. Plus, it's a gives more time for the Pulsar team to evaluate incorporating slog into the SDK. ### Modifications One additional file `/pulsar/log/wrapper_slog.go` is added. One additional function in the `pulsar/log` package, `NewLoggerWithSlog` , is exposed. --- README.md| 2 +- go.mod | 2 +- pulsar/client_impl_with_slog_test.go | 49 + pulsar/log/wrapper_slog.go | 105 pulsar/log/wrapper_slog_test.go | 186 +++ 5 files changed, 342 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c77cdea7..028e13a5 100644 --- a/README.md +++ b/README.md @@ -152,7 +152,7 @@ Run the tests: Run the tests with specific versions of GOLANG and PULSAR: -make test GOLANG_VERSION=1.20 PULSAR_VERSION=2.10.0 +make test GO_VERSION=1.20 PULSAR_VERSION=2.10.0 ## Contributing diff --git a/go.mod b/go.mod index bbbf9bd6..e3f95ba8 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/apache/pulsar-client-go -go 1.18 +go 1.20 require ( github.com/99designs/keyring v1.2.1 diff --git a/pulsar/client_impl_with_slog_test.go b/pulsar/client_impl_with_slog_test.go new file mode 100644 index ..1882cd8d --- /dev/null +++ b/pulsar/client_impl_with_slog_test.go @@ -0,0 +1,49 @@ +/// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.21 + +package pulsar + +import ( + "log/slog" + "os" + "testing" + + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/stretchr/testify/assert" +) + +func TestClientWithSlog(t *testing.T) { + sLogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + + client, err := NewClient(ClientOptions{ + URL:serviceURL, + Logger: log.NewLoggerWithSlog(sLogger), + }) + assert.NotNil(t, client) + assert.Nil(t, err) + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + }) + assert.NotNil(t, producer) + assert.Nil(t, err) + + producer.Close() + client.Close() +} diff --git a/pulsar/log/wrapper_slog.go b/pulsar/log/wrapper_slog.go new file mode 100644 index ..b9728400 --- /dev/null +++ b/pulsar/log/wrapper_slog.go @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an
(pulsar-site) branch go-txn deleted (was 866d1b283083)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch go-txn in repository https://gitbox.apache.org/repos/asf/pulsar-site.git was 866d1b283083 [feat][doc] Mark transaction support for go client feature matrix The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(pulsar-site) branch main updated: [feat][doc] Mark transaction support for go client feature matrix (#932)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new d7e70edc9467 [feat][doc] Mark transaction support for go client feature matrix (#932) d7e70edc9467 is described below commit d7e70edc94676ae236d0425d249e47e04bfb3322 Author: Zike Yang AuthorDate: Tue Jul 2 15:43:18 2024 +0800 [feat][doc] Mark transaction support for go client feature matrix (#932) ## Motivation The go client has already supported the transaction. https://github.com/apache/pulsar-client-go/pull/1002 But the client feature matrix doesn't reflect this. This PR marks the transaction support for go client feature matrix --- data/matrix.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/matrix.js b/data/matrix.js index 9dfeaa68a0a0..c18ce820ed97 100644 --- a/data/matrix.js +++ b/data/matrix.js @@ -368,7 +368,7 @@ module.exports = { Feature: "Transaction", Java: 2, "C++": 0, - Go: 0, + Go: 2, Python: 0, Nodejs: 0, "C#/DotPulsar": 0, @@ -750,7 +750,7 @@ module.exports = { Feature: "Transaction", Java: 2, "C++": 0, - Go: 0, + Go: 2, Python: 0, Nodejs: 0, "C#/DotPulsar": 0,
(pulsar-client-go) branch master updated: [improve] Reuse function checkMsgIDPartition (#1232)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 827d6827 [improve] Reuse function checkMsgIDPartition (#1232) 827d6827 is described below commit 827d68273830d1038983aeb2750189a408d49cd5 Author: crossoverJie AuthorDate: Mon Jul 1 18:31:46 2024 +0800 [improve] Reuse function checkMsgIDPartition (#1232) ### Modifications Reuse function checkMsgIDPartition. --- pulsar/consumer_impl.go | 11 ++- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 291471cd..679054a0 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -856,17 +856,10 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I } func (c *consumer) messageID(msgID MessageID) *trackingMessageID { - mid := toTrackingMessageID(msgID) - - partition := int(mid.partitionIdx) - // did we receive a valid partition index? - if partition < 0 || partition >= len(c.consumers) { - c.log.Warnf("invalid partition index %d expected a partition between [0-%d]", - partition, len(c.consumers)) + if err := c.checkMsgIDPartition(msgID); err != nil { return nil } - - return mid + return toTrackingMessageID(msgID) } func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error {
(pulsar-site) 01/01: [feat][doc] Mark transaction support for go client feature matrix
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch go-txn in repository https://gitbox.apache.org/repos/asf/pulsar-site.git commit 866d1b2830836f8351f58b459c222dcac170a32a Author: Zike Yang AuthorDate: Mon Jul 1 11:32:48 2024 +0800 [feat][doc] Mark transaction support for go client feature matrix --- data/matrix.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data/matrix.js b/data/matrix.js index 9dfeaa68a0a0..c18ce820ed97 100644 --- a/data/matrix.js +++ b/data/matrix.js @@ -368,7 +368,7 @@ module.exports = { Feature: "Transaction", Java: 2, "C++": 0, - Go: 0, + Go: 2, Python: 0, Nodejs: 0, "C#/DotPulsar": 0, @@ -750,7 +750,7 @@ module.exports = { Feature: "Transaction", Java: 2, "C++": 0, - Go: 0, + Go: 2, Python: 0, Nodejs: 0, "C#/DotPulsar": 0,
(pulsar-site) branch go-txn created (now 866d1b283083)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch go-txn in repository https://gitbox.apache.org/repos/asf/pulsar-site.git at 866d1b283083 [feat][doc] Mark transaction support for go client feature matrix This branch includes the following new commits: new 866d1b283083 [feat][doc] Mark transaction support for go client feature matrix The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(pulsar) branch master updated: [fix][doc] Fix the doc for the message redelivery backoff (#22855)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 20eb3c61ec2 [fix][doc] Fix the doc for the message redelivery backoff (#22855) 20eb3c61ec2 is described below commit 20eb3c61ec2fdfb38b6d2830f14aa04ccf5383de Author: Zike Yang AuthorDate: Wed Jun 26 10:10:54 2024 +0800 [fix][doc] Fix the doc for the message redelivery backoff (#22855) ## Motivation The document states that `ackTimeoutRedeliveryBackoff` cannot be used with `consumer.negativeAcknowledge(MessageId messageId)`. However, this is confusing. The `ackTimeoutRedeliveryBackoff` should not relate to the nack. ## Modification - Fix the doc for the `ackTimeoutRedeliveryBackoff` - Improve the doc for `negativeAckRedeliveryBackoff` and `ackTimeoutRedeliveryBackoff` --- .../apache/pulsar/client/api/ConsumerBuilder.java | 73 -- 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 6f3c3be9727..c7919fa473f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -791,31 +791,66 @@ public interface ConsumerBuilder extends Cloneable { ConsumerBuilder messagePayloadProcessor(MessagePayloadProcessor payloadProcessor); /** - * negativeAckRedeliveryBackoff doesn't work with `consumer.negativeAcknowledge(MessageId messageId)` - * because we are unable to get the redelivery count from the message ID. + * negativeAckRedeliveryBackoff sets the redelivery backoff policy for messages that are negatively acknowledged + * using + * `consumer.negativeAcknowledge(Message message)` but not with `consumer.negativeAcknowledge(MessageId + * messageId)`. + * This setting allows specifying a backoff policy for messages that are negatively acknowledged, + * enabling more flexible control over the delay before such messages are redelivered. + * + * This configuration accepts a {@link RedeliveryBackoff} object that defines the backoff policy. + * The policy can be either a fixed delay or an exponential backoff. An exponential backoff policy + * is beneficial in scenarios where increasing the delay between consecutive redeliveries can help + * mitigate issues like temporary resource constraints or processing bottlenecks. + * + * Note: This backoff policy does not apply when using `consumer.negativeAcknowledge(MessageId messageId)` + * because the redelivery count cannot be determined from just the message ID. It is recommended to use + * `consumer.negativeAcknowledge(Message message)` if you want to leverage the redelivery backoff policy. + * + * Example usage: + * {@code + * client.newConsumer() + * .negativeAckRedeliveryBackoff(ExponentialRedeliveryBackoff.builder() + * .minDelayMs(1000) // Set minimum delay to 1 second + * .maxDelayMs(6) // Set maximum delay to 60 seconds + * .build()) + * .subscribe(); + * } * - * Example: - * - * client.newConsumer().negativeAckRedeliveryBackoff(ExponentialRedeliveryBackoff.builder() - * .minNackTimeMs(1000) - * .maxNackTimeMs(60 * 1000) - * .build()).subscribe(); - * + * @param negativeAckRedeliveryBackoff the backoff policy to use for negatively acknowledged messages + * @return the consumer builder instance */ ConsumerBuilder negativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff); + /** - * redeliveryBackoff doesn't work with `consumer.negativeAcknowledge(MessageId messageId)` - * because we are unable to get the redelivery count from the message ID. + * Sets the redelivery backoff policy for messages that are redelivered due to acknowledgement timeout. + * This setting allows you to specify a backoff policy for messages that are not acknowledged within + * the specified ack timeout. By using a backoff policy, you can control the delay before a message + * is redelivered, potentially improving consumer performance by avoiding immediate redelivery of + * messages that might still be processing. * - * Example: - * - * client.newConsumer().ackTimeout(10, TimeUnit.SECOND) - * .ackTimeoutRedeliveryBackoff(ExponentialRedeliveryBackoff.builder() - * .minNackTimeMs(1000) - * .maxNackTimeMs(60 * 1000) - * .build()).subscribe(); - * +
(pulsar-client-go) branch master updated: [Issue 1223] Support ZeroQueueConsumer (#1225)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new aa090bed [Issue 1223] Support ZeroQueueConsumer (#1225) aa090bed is described below commit aa090bedcddd5b67ef5f91d4e15ec2caf453d540 Author: crossoverJie AuthorDate: Wed Jun 26 09:59:35 2024 +0800 [Issue 1223] Support ZeroQueueConsumer (#1225) Fixes #1223 ### Motivation Support ZeroQueueConsumer, refer to Java [ZeroQueueConsumerImpl](https://github.com/apache/pulsar/blob/8c50a6c2e91c81dbf187ce5e66cb39e2758a741e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java#L42) ### Modifications - The consumer add a new optional parameter `EnableZeroQueueConsumer` - Add a new `zeroQueueConsumer` --- pulsar/consumer.go | 5 + pulsar/consumer_impl.go| 114 + pulsar/consumer_zero_queue.go | 289 ++ pulsar/consumer_zero_queue_test.go | 490 + pulsar/reader_impl.go | 8 +- 5 files changed, 860 insertions(+), 46 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 0b70a163..31f89a54 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -160,6 +160,11 @@ type ConsumerOptions struct { // Default value is `1000` messages and should be good for most use cases. ReceiverQueueSize int + // EnableZeroQueueConsumer, if enabled, the ReceiverQueueSize will be 0. + // Notice: only non-partitioned topic is supported. + // Default is false. + EnableZeroQueueConsumer bool + // EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled // by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer // receive queue can be scaled. diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 71ac3232..291471cd 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -22,9 +22,12 @@ import ( "fmt" "math/rand" "strconv" + "strings" "sync" "time" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" @@ -81,6 +84,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { options.ReceiverQueueSize = defaultReceiverQueueSize } + if options.EnableZeroQueueConsumer { + options.ReceiverQueueSize = 0 + } + if options.Interceptors == nil { options.Interceptors = defaultConsumerInterceptors } @@ -236,7 +243,24 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } func newInternalConsumer(client *client, options ConsumerOptions, topic string, - messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) { + messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) { + partitions, err := client.TopicPartitions(topic) + if err != nil { + return nil, err + } + + if len(partitions) > 1 && options.EnableZeroQueueConsumer { + return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics") + } + + if len(partitions) == 1 && options.EnableZeroQueueConsumer && + strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) { + return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics") + } + + if len(partitions) == 1 && options.EnableZeroQueueConsumer { + return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation) + } consumer := &consumer{ topic: topic, @@ -253,7 +277,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, metrics: client.metrics.GetLeveledMetrics(topic), } - err := consumer.internalTopicSubscribeToPartitions() + err = consumer.internalTopicSubscribeToPartitions() if err != nil { return nil, err } @@ -343,10 +367,6 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { consumer *partitionConsumer
(pulsar-client-go) branch master updated: [Improve] Add admin topic api CreateWithProperties (#1226)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new a1ce5e6d [Improve] Add admin topic api CreateWithProperties (#1226) a1ce5e6d is described below commit a1ce5e6d48fd00d07d45958a5c7482751f0cbb68 Author: crossoverJie AuthorDate: Wed Jun 12 17:53:12 2024 +0800 [Improve] Add admin topic api CreateWithProperties (#1226) ### Motivation To keep consistent with the [Java client](https://github.com/apache/pulsar/pull/12818). ### Modifications - Add admin topic api CreateWithProperties - Add admin topic api GetProperties --- pulsaradmin/pkg/admin/topic.go | 37 ++--- pulsaradmin/pkg/admin/topic_test.go | 33 + pulsaradmin/pkg/rest/client.go | 22 -- 3 files changed, 87 insertions(+), 5 deletions(-) diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index 7badc634..82cfc87e 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -21,6 +21,8 @@ import ( "fmt" "strconv" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" ) @@ -35,6 +37,20 @@ type Topics interface { //when setting to 0, it will create a non-partitioned topic Create(topic utils.TopicName, partitions int) error + // CreateWithProperties Create a partitioned or non-partitioned topic + // + // @param topic + //topicName struct + // @param partitions + //number of topic partitions, + //when setting to 0, it will create a non-partitioned topic + // @param meta + //topic properties + CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error + + // GetProperties returns the properties of a topic + GetProperties(topic utils.TopicName) (map[string]string, error) + // Delete a topic, this function can delete both partitioned or non-partitioned topic // // @param topic @@ -392,14 +408,29 @@ func (c *pulsarClient) Topics() Topics { } func (t *topics) Create(topic utils.TopicName, partitions int) error { + return t.CreateWithProperties(topic, partitions, nil) +} +func (t *topics) CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error { endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions") - data := &partitions if partitions == 0 { endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath()) - data = nil + return t.pulsar.Client.Put(endpoint, meta) + } + data := struct { + Meta map[string]string `json:"properties"` + Partitions int `json:"partitions"` + }{ + Meta: meta, + Partitions: partitions, } + return t.pulsar.Client.PutWithCustomMediaType(endpoint, &data, nil, nil, rest.PartitionedTopicMetaJSON) +} - return t.pulsar.Client.Put(endpoint, data) +func (t *topics) GetProperties(topic utils.TopicName) (map[string]string, error) { + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties") + var properties map[string]string + err := t.pulsar.Client.Get(endpoint, &properties) + return properties, err } func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned bool) error { diff --git a/pulsaradmin/pkg/admin/topic_test.go b/pulsaradmin/pkg/admin/topic_test.go index a3609ff6..fced0542 100644 --- a/pulsaradmin/pkg/admin/topic_test.go +++ b/pulsaradmin/pkg/admin/topic_test.go @@ -65,6 +65,39 @@ func TestCreateTopic(t *testing.T) { t.Error("Couldn't find topic: " + topic) } +func TestTopics_CreateWithProperties(t *testing.T) { + topic := newTopicName() + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + // Create non-partition topic + topicName, err := utils.GetTopicName(topic) + assert.NoError(t, err) + err = admin.Topics().CreateWithProperties(*topicName, 0, map[string]string{ + "key1": "value1", + }) + assert.NoError(t, err) + + properties, err := admin.Topics().GetProperties(*topicName) + assert.NoError(t, err) + assert.Equal(t, properties["key1"], "value1") + + // Create partition topic + topic = newTopicName() + topicN
(pulsar-client-go) branch master updated: [Improve] PIP-313 Add GetLastMessageIDs API (#1221)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 2d513231 [Improve] PIP-313 Add GetLastMessageIDs API (#1221) 2d513231 is described below commit 2d51323103a65ba522dc73f56f052cc81d663245 Author: crossoverJie AuthorDate: Tue May 28 18:45:15 2024 +0800 [Improve] PIP-313 Add GetLastMessageIDs API (#1221) ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/20040 ### Modifications - Add `GetLastMessageIDs`api for consumer. --- pulsar/consumer.go | 5 + pulsar/consumer_impl.go| 13 +++ pulsar/consumer_multitopic.go | 12 +++ pulsar/consumer_multitopic_test.go | 81 ++ pulsar/consumer_regex.go | 12 +++ pulsar/consumer_regex_test.go | 68 pulsar/consumer_test.go| 120 + pulsar/impl_message.go | 37 +++ .../pulsartracing/consumer_interceptor_test.go | 5 + pulsar/message.go | 6 ++ 10 files changed, 359 insertions(+) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 3161af51..0b70a163 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -278,6 +278,11 @@ type Consumer interface { // where more than one consumer are currently connected. UnsubscribeForce() error + // GetLastMessageIDs get all the last message id of the topics the consumer subscribed. + // + // The list of MessageID instances of all the topics that the consumer subscribed + GetLastMessageIDs() ([]TopicMessageID, error) + // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index b22eeaaf..71ac3232 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -472,6 +472,19 @@ func (c *consumer) unsubscribe(force bool) error { return nil } +func (c *consumer) GetLastMessageIDs() ([]TopicMessageID, error) { + ids := make([]TopicMessageID, 0) + for _, pc := range c.consumers { + id, err := pc.getLastMessageID() + tm := &topicMessageID{topic: pc.topic, track: id} + if err != nil { + return nil, err + } + ids = append(ids, tm) + } + return ids, nil +} + func (c *consumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index eaf42ad7..020e38a0 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -109,6 +109,18 @@ func (c *multiTopicConsumer) UnsubscribeForce() error { return errs } +func (c *multiTopicConsumer) GetLastMessageIDs() ([]TopicMessageID, error) { + ids := make([]TopicMessageID, 0) + for _, c := range c.consumers { + id, err := c.GetLastMessageIDs() + if err != nil { + return nil, err + } + ids = append(ids, id...) + } + return ids, nil +} + func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index 58ad0957..7c6b898a 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -22,6 +22,10 @@ import ( "strings" "testing" + "github.com/apache/pulsar-client-go/pulsaradmin" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/stretchr/testify/assert" ) @@ -137,3 +141,80 @@ func TestMultiTopicConsumerForceUnsubscribe(t *testing.T) { err = consumer.UnsubscribeForce() assert.Error(t, err) } + +func TestMultiTopicGetLastMessageIDs(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic1Partition, topic2Partition, topic3Partition := 1, 2, 3 + + topic1 := newTopicName() + err = createPartitionedTopic(topic1, topic1Partition) + assert.Nil(t, err) + + topic2 := newTopicName() + err = createPartitionedTopic(topic2, topic2Partition) + assert.Nil(t, err) + + topic3 := newTopicNam
(pulsar) branch master updated: [improve][client] Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` (#22747)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 878a4129a2f [improve][client] Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` (#22747) 878a4129a2f is described below commit 878a4129a2fdaa133e935b2fe4aca8fc4f0292ad Author: Zike Yang AuthorDate: Tue May 21 10:08:19 2024 +0800 [improve][client] Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` (#22747) ### Motivation After discussing [here](https://github.com/apache/pulsar/pull/22698#discussion_r1597445741), the pulsar client shouldn't expose the `offset` term to users. ### Modifications - Deprecate `MessageIdUtils.getOffset` and `MessageIdUtils.getMessageId` - For connectors, use `FunctionCommon.getOffset` and `FunctionCommon.getMessageId` --- .../java/org/apache/pulsar/client/util/MessageIdUtils.java | 2 ++ pulsar-io/kafka-connect-adaptor/pom.xml | 6 ++ .../pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java | 4 ++-- .../apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 12 ++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java index 60cdad8e772..459a31ee720 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/MessageIdUtils.java @@ -22,6 +22,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; public class MessageIdUtils { +@Deprecated public static final long getOffset(MessageId messageId) { MessageIdImpl msgId = (MessageIdImpl) messageId; long ledgerId = msgId.getLedgerId(); @@ -34,6 +35,7 @@ public class MessageIdUtils { return offset; } +@Deprecated public static final MessageId getMessageId(long offset) { // Demultiplex ledgerId and entryId from offset long ledgerId = offset >>> 28; diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index dcb040840c7..0be2a68dc1b 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -46,6 +46,12 @@ compile + + org.apache.pulsar + pulsar-functions-utils + ${project.version} + + com.fasterxml.jackson.core jackson-databind diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java index 7a908b553a8..760799e0daa 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaSinkTaskContext.java @@ -40,7 +40,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkTaskContext; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.util.MessageIdUtils; +import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.io.core.SinkContext; @Slf4j @@ -150,7 +150,7 @@ public class PulsarKafkaSinkTaskContext implements SinkTaskContext { try { ctx.seek(desanitizeTopicName.apply(topicPartition.topic()), topicPartition.partition(), -MessageIdUtils.getMessageId(offset)); +FunctionCommon.getMessageId(offset)); } catch (PulsarClientException e) { log.error("Failed to seek topic {} partition {} offset {}", topicPartition.topic(), topicPartition.partition(), offset, e); diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java index 1100b13b425..1bcd2442001 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java @@ -64,12 +64,12 @@ import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; import org.apache.pulsar.client.impl.schema.generic.Ge
(pulsar-client-go) branch master updated: [Issue 1218][Reader] Reader Next returns on closed consumer (#1219)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new a09ae322 [Issue 1218][Reader] Reader Next returns on closed consumer (#1219) a09ae322 is described below commit a09ae3225334fb9587e5087e3adb2f944e89635d Author: Gaylor Bosson AuthorDate: Mon May 20 12:43:04 2024 +0200 [Issue 1218][Reader] Reader Next returns on closed consumer (#1219) Fixes #1218 ### Motivation Calling Next on a reader already closed will block forever unless the context is canceled. Similarly, the call will not return if a different go routine closes the reader. ### Modifications Next now listens for the close channel of the consumer to return an error when it closes. Signed-off-by: Gaylor Bosson --- pulsar/reader_impl.go | 2 ++ pulsar/reader_test.go | 24 pulsar/table_view_impl.go | 11 --- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index bf91c67f..4f4e2aa9 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -171,6 +171,8 @@ func (r *reader) Next(ctx context.Context) (Message, error) { return nil, err } return cm.Message, nil + case <-r.c.closeCh: + return nil, newError(ConsumerClosed, "consumer closed") case <-ctx.Done(): return nil, ctx.Err() } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index a9c45ba8..d00346fc 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -1035,3 +1035,27 @@ func TestReaderHasNextRetryFailed(t *testing.T) { } } + +func TestReaderNextReturnsOnClosedConsumer(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + OperationTimeout: 2 * time.Second, + }) + assert.NoError(t, err) + topic := newTopicName() + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + + reader.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var e *Error + _, err = reader.Next(ctx) + assert.ErrorAs(t, err, &e) + assert.Equal(t, ConsumerClosed, e.Result()) +} diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go index 17e0b90f..60b66e33 100644 --- a/pulsar/table_view_impl.go +++ b/pulsar/table_view_impl.go @@ -129,7 +129,9 @@ func (tv *TableViewImpl) partitionUpdateCheck() error { if err != nil { tv.logger.Errorf("read next message failed for %s: %w", partition, err) } - tv.handleMessage(msg) + if msg != nil { + tv.handleMessage(msg) + } } ctx, cancelFunc := context.WithCancel(context.Background()) tv.cancelRaders[partition] = cancelReader{ @@ -268,9 +270,12 @@ func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader R if err != nil { tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err) } - if errors.Is(err, context.Canceled) { + var e *Error + if (errors.As(err, &e) && e.Result() == ConsumerClosed) || errors.Is(err, context.Canceled) { return } - tv.handleMessage(msg) + if msg != nil { + tv.handleMessage(msg) + } } }
(pulsar-client-go) branch master updated (8e908733 -> 43d8cfc4)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 8e908733 [fix] Return an error when AckCumulative on a Shared/KeyShared subscription (#1217) add 43d8cfc4 [cleanup] Remove AvroCodec from JSONSchema (#1216) No new revisions were added by this update. Summary of changes: pulsar/schema.go | 1 - 1 file changed, 1 deletion(-)
(pulsar-client-go) branch master updated: [fix] Return an error when AckCumulative on a Shared/KeyShared subscription (#1217)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 8e908733 [fix] Return an error when AckCumulative on a Shared/KeyShared subscription (#1217) 8e908733 is described below commit 8e9087339d0d004231d3f8fc5f823e561840182b Author: Zike Yang AuthorDate: Tue May 14 17:20:31 2024 +0800 [fix] Return an error when AckCumulative on a Shared/KeyShared subscription (#1217) ### Motivation The consumer should return error when AckCumulative on a Shared/KeyShared subscription --- pulsar/consumer_partition.go | 14 +- pulsar/consumer_test.go | 35 +++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index dc01e692..f752afbc 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -20,7 +20,6 @@ package pulsar import ( "container/list" "encoding/hex" - "errors" "fmt" "math" "strings" @@ -36,6 +35,7 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" "github.com/bits-and-blooms/bitset" + "github.com/pkg/errors" uAtomic "go.uber.org/atomic" ) @@ -50,6 +50,10 @@ const ( consumerClosed ) +var ( + ErrInvalidAck = errors.New("invalid ack") +) + func (s consumerState) String() string { switch s { case consumerInit: @@ -686,12 +690,20 @@ func (pc *partitionConsumer) AckIDWithResponseCumulative(msgID MessageID) error return pc.internalAckIDCumulative(msgID, true) } +func (pc *partitionConsumer) isAllowAckCumulative() bool { + return pc.options.subscriptionType != Shared && pc.options.subscriptionType != KeyShared +} + func (pc *partitionConsumer) internalAckIDCumulative(msgID MessageID, withResponse bool) error { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { pc.log.WithField("state", state).Error("Failed to ack by closing or closed consumer") return errors.New("consumer state is closed") } + if !pc.isAllowAckCumulative() { + return errors.Wrap(ErrInvalidAck, "cumulative ack is not allowed for the Shared/KeyShared subscription type") + } + // chunk message id will be converted to tracking message id trackingID := toTrackingMessageID(msgID) if trackingID == nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 4120ba4b..00f48cae 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4392,3 +4392,38 @@ func TestMultiConsumerMemoryLimit(t *testing.T) { return assert.Equal(t, pc2PrevQueueSize/2, pc2.currentQueueSize.Load()) }) } + +func TestConsumerAckCumulativeOnSharedSubShouldFailed(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + consumer, err := client.Subscribe(ConsumerOptions{ + Topic:topic, + SubscriptionName: "my-sub", + Type: Shared, + }) + assert.Nil(t, err) + defer consumer.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }) + assert.Nil(t, err) + + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + + err = consumer.AckIDCumulative(msg.ID()) + assert.NotNil(t, err) + assert.ErrorIs(t, err, ErrInvalidAck) +}
(pulsar-client-go) branch master updated: [Improve] Supplement schema admin api (#1215)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 49fce726 [Improve] Supplement schema admin api (#1215) 49fce726 is described below commit 49fce72614c37fd8644c6305eb3405fcf48d4f52 Author: crossoverJie AuthorDate: Fri May 10 22:08:57 2024 +0800 [Improve] Supplement schema admin api (#1215) ### Motivation To keep consistent with the Java client. ### Modifications - CreateSchemaBySchemaInfo - GetVersionBySchemaInfo - GetVersionByPayload - TestCompatibilityWithSchemaInfo - TestCompatibilityWithPostSchemaPayload --- pulsaradmin/pkg/admin/schema.go | 59 pulsaradmin/pkg/admin/schema_test.go | 45 +++ pulsaradmin/pkg/utils/schema_util.go | 23 ++ 3 files changed, 127 insertions(+) diff --git a/pulsaradmin/pkg/admin/schema.go b/pulsaradmin/pkg/admin/schema.go index d97cd7ad..7190bd99 100644 --- a/pulsaradmin/pkg/admin/schema.go +++ b/pulsaradmin/pkg/admin/schema.go @@ -43,6 +43,22 @@ type Schema interface { // CreateSchemaByPayload creates a schema for a given topic CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error + + // CreateSchemaBySchemaInfo creates a schema for a given topic + CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error + + // GetVersionBySchemaInfo gets the version of a schema + GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) + + // GetVersionByPayload gets the version of a schema + GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) + + // TestCompatibilityWithSchemaInfo tests compatibility with a schema + TestCompatibilityWithSchemaInfo(topic string, schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) + + // TestCompatibilityWithPostSchemaPayload tests compatibility with a schema + TestCompatibilityWithPostSchemaPayload(topic string, + schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) } type schemas struct { @@ -148,3 +164,46 @@ func (s *schemas) CreateSchemaByPayload(topic string, schemaPayload utils.PostSc return s.pulsar.Client.Post(endpoint, &schemaPayload) } + +func (s *schemas) CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error { + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + return s.CreateSchemaByPayload(topic, schemaPayload) +} + +func (s *schemas) GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) { + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + return s.GetVersionByPayload(topic, schemaPayload) +} + +func (s *schemas) GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) { + topicName, err := utils.GetTopicName(topic) + if err != nil { + return 0, err + } + version := struct { + Version int64 `json:"version"` + }{} + endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), + topicName.GetLocalName(), "version") + err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &version) + return version.Version, err +} + +func (s *schemas) TestCompatibilityWithSchemaInfo(topic string, + schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) { + schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo) + return s.TestCompatibilityWithPostSchemaPayload(topic, schemaPayload) +} + +func (s *schemas) TestCompatibilityWithPostSchemaPayload(topic string, + schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) { + topicName, err := utils.GetTopicName(topic) + if err != nil { + return nil, err + } + var isCompatibility utils.IsCompatibility + endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), + topicName.GetLocalName(), "compatibility") + err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &isCompatibility) + return &isCompatibility, err +} diff --git a/pulsaradmin/pkg/admin/schema_test.go b/pulsaradmin/pkg/admin/schema_test.go index 17c1a54d..3560559e 100644 --- a/pulsaradmin/pkg/admin/schema_test.go +++ b/pulsaradmin/pkg/admin/schema_test.go @@ -77,3 +77,48 @@ func TestSchemas_ForceDeleteSchema(t *testing.T) { assert.Errorf(t, err, "Schema not found") } + +func TestSchemas_CreateSchemaBySchemaInfo(t *testing.T) { + cfg := &config.C
(pulsar-client-go) branch master updated: [Improve] Add admin api ForceDeleteSchema (#1213)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new a086d325 [Improve] Add admin api ForceDeleteSchema (#1213) a086d325 is described below commit a086d3257cfbe1226ca9727211de449857b178e4 Author: crossoverJie AuthorDate: Tue May 7 23:13:47 2024 +0800 [Improve] Add admin api ForceDeleteSchema (#1213) ### Motivation To keep consistent with the Java client. ### Modifications Add admin api ForceDeleteSchema --- pulsaradmin/pkg/admin/schema.go | 16 +++- pulsaradmin/pkg/admin/schema_test.go | 79 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/pulsaradmin/pkg/admin/schema.go b/pulsaradmin/pkg/admin/schema.go index 14655268..d97cd7ad 100644 --- a/pulsaradmin/pkg/admin/schema.go +++ b/pulsaradmin/pkg/admin/schema.go @@ -38,6 +38,9 @@ type Schema interface { // DeleteSchema deletes the schema associated with a given topic DeleteSchema(topic string) error + // ForceDeleteSchema force deletes the schema associated with a given topic + ForceDeleteSchema(topic string) error + // CreateSchemaByPayload creates a schema for a given topic CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error } @@ -112,6 +115,14 @@ func (s *schemas) GetSchemaInfoByVersion(topic string, version int64) (*utils.Sc } func (s *schemas) DeleteSchema(topic string) error { + return s.delete(topic, false) +} + +func (s *schemas) ForceDeleteSchema(topic string) error { + return s.delete(topic, true) +} + +func (s *schemas) delete(topic string, force bool) error { topicName, err := utils.GetTopicName(topic) if err != nil { return err @@ -120,9 +131,10 @@ func (s *schemas) DeleteSchema(topic string) error { endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName(), "schema") - fmt.Println(endpoint) + queryParams := make(map[string]string) + queryParams["force"] = strconv.FormatBool(force) - return s.pulsar.Client.Delete(endpoint) + return s.pulsar.Client.DeleteWithQueryParams(endpoint, queryParams) } func (s *schemas) CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error { diff --git a/pulsaradmin/pkg/admin/schema_test.go b/pulsaradmin/pkg/admin/schema_test.go new file mode 100644 index ..17c1a54d --- /dev/null +++ b/pulsaradmin/pkg/admin/schema_test.go @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "fmt" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/stretchr/testify/assert" +) + +func TestSchemas_DeleteSchema(t *testing.T) { + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + schemaPayload := utils.PostSchemaPayload{ + SchemaType: "STRING", + Schema: "", + } + topic := fmt.Sprintf("my-topic-%v", time.Now().Nanosecond()) + err = admin.Schemas().CreateSchemaByPayload(topic, schemaPayload) + assert.NoError(t, err) + + info, err := admin.Schemas().GetSchemaInfo(topic) + assert.NoError(t, err) + assert.Equal(t, schemaPayload.SchemaType, info.Type) + + err = admin.Schemas().DeleteSchema(topic) + assert.NoError(t, err) + + _, err = admin.Schemas().GetSchemaInfo(topic) + assert.Errorf(t, err, "Schema not found") + +} +func TestSchemas_ForceDeleteSchema(t *testing.T) { + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admi
(pulsar-client-go) branch master updated: [Improve] Add admin api GetListActiveBrokers (#1212)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 8fa0878c [Improve] Add admin api GetListActiveBrokers (#1212) 8fa0878c is described below commit 8fa0878c88ee4b6e002c7764a10c79012b14be90 Author: crossoverJie AuthorDate: Tue May 7 11:13:11 2024 +0800 [Improve] Add admin api GetListActiveBrokers (#1212) ### Motivation To keep consistent with the [Java client](https://github.com/apache/pulsar/pull/14702). ### Modifications Add admin api GetListActiveBrokers --- pulsaradmin/pkg/admin/brokers.go | 13 + pulsaradmin/pkg/admin/brokers_test.go | 15 +++ 2 files changed, 28 insertions(+) diff --git a/pulsaradmin/pkg/admin/brokers.go b/pulsaradmin/pkg/admin/brokers.go index 650fab8e..7dcea800 100644 --- a/pulsaradmin/pkg/admin/brokers.go +++ b/pulsaradmin/pkg/admin/brokers.go @@ -27,6 +27,9 @@ import ( // Brokers is admin interface for brokers management type Brokers interface { + + // GetListActiveBrokers Get the list of active brokers in the local cluster. + GetListActiveBrokers() ([]string, error) // GetActiveBrokers returns the list of active brokers in the cluster. GetActiveBrokers(cluster string) ([]string, error) @@ -86,6 +89,16 @@ func (b *broker) GetActiveBrokers(cluster string) ([]string, error) { return res, nil } +func (b *broker) GetListActiveBrokers() ([]string, error) { + endpoint := b.pulsar.endpoint(b.basePath) + var res []string + err := b.pulsar.Client.Get(endpoint, &res) + if err != nil { + return nil, err + } + return res, nil +} + func (b *broker) GetDynamicConfigurationNames() ([]string, error) { endpoint := b.pulsar.endpoint(b.basePath, "/configuration/") var res []string diff --git a/pulsaradmin/pkg/admin/brokers_test.go b/pulsaradmin/pkg/admin/brokers_test.go index 97679759..3ae9e4ae 100644 --- a/pulsaradmin/pkg/admin/brokers_test.go +++ b/pulsaradmin/pkg/admin/brokers_test.go @@ -58,3 +58,18 @@ func TestGetLeaderBroker(t *testing.T) { assert.NotEmpty(t, leaderBroker.ServiceURL) assert.NotEmpty(t, leaderBroker.BrokerID) } + +func TestGetAllActiveBrokers(t *testing.T) { + readFile, err := os.ReadFile("../../../integration-tests/tokens/admin-token") + assert.NoError(t, err) + cfg := &config.Config{ + Token: string(readFile), + } + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + brokers, err := admin.Brokers().GetListActiveBrokers() + assert.NoError(t, err) + assert.NotEmpty(t, brokers) +}
(pulsar-client-go) branch master updated: [improve] Use physical address information in connection pool key (#1206)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 26e80855 [improve] Use physical address information in connection pool key (#1206) 26e80855 is described below commit 26e80855bb9848d41781840368e53d74f7df1102 Author: Dragos Misca AuthorDate: Thu Apr 18 06:08:33 2024 -0700 [improve] Use physical address information in connection pool key (#1206) ### Motivation Migrate https://github.com/apache/pulsar/pull/22085/ and (parts of) https://github.com/apache/pulsar-client-cpp/pull/411/ over to the Go client. Context for this idea [here](https://github.com/apache/pulsar/pull/22085/files#r1497008116). Golang client support for blue-green migration needs the connection pool to differentiate between connections with the same logical address, but different physical addresses. Otherwise, the wrong connection might be used by the client, in effect pointing to the old cluster, instead of the new one. ### Modifications The connection pool maintains a map of connections, keyed by their logical address and a random connection id. This PR proposes including the physical address in the key also, therefore allowing the upper layer to differentiate between connections with identical logical addresses, but different physical addresses. In addition to this change, the test setup had to be fixed to address breakages in `TestRetryWithMultipleHosts` and `TestReaderWithMultiHosts`. All tests in the repository are using a local standalone setup currently. This unusual configuration has broker lookup operations reply with flag `proxyThroughServiceUrl=true` ([ref](https://github.com/apache/pulsar/blob/e7c2a75473b545134a3b292ae0e87a79d65cb756/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java#L3 [...] | Logical Address | Physical Address | Notes | | --- | | - | | reachable-broker | reachable-broker | Valid | | unreachable-broker | unreachable-broker | Valid, but currently unusable | | reachable-broker | unreachable-broker | *Invalid entry* | To address the issue: - Switch the test setup to a more common cluster configuration. File `integration-tests/clustered/docker-compose.yml` instructs how this setup should look like. - Migrate the tests to separate files and test suites. New test files `pulsar/client_impl_clustered_test.go` and `pulsar/reader_clustered_test.go` contain Go tag `clustered`, allowing them to be ignored during the standalone test runs by virtue of the Go build process. - Add script `run-ci-clustered.sh`, specifying the "clustered" tests to run. - Changes in the `Makefile` add targets `make test_clustered` `make test_standalone` to run the respective test suites independently, while allowing `make test` to run all the tests, as before. - `Dockerfile` and `run-ci.sh` are modified to run the Go build process in the container build, such that it does not need to be run again in the new `run-ci-clustered.sh` script. The image is locally consumed by the tests only and is not published, so there is no risk of contaminating users. --- Dockerfile | 13 ++ Makefile | 10 +- integration-tests/clustered/docker-compose.yml | 167 + pulsar/client_impl_clustered_test.go | 89 + pulsar/client_impl_test.go | 55 pulsar/internal/connection_pool.go | 7 +- pulsar/reader_clustered_test.go| 88 + pulsar/reader_test.go | 52 scripts/{run-ci.sh => run-ci-clustered.sh} | 17 +-- scripts/run-ci.sh | 10 -- 10 files changed, 371 insertions(+), 137 deletions(-) diff --git a/Dockerfile b/Dockerfile index 818a106a..51e35f0a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,3 +42,16 @@ COPY integration-tests/conf/.htpasswd \ COPY . /pulsar/pulsar-client-go ENV PULSAR_EXTRA_OPTS="-Dpulsar.auth.basic.conf=/pulsar/conf/.htpasswd" + +WORKDIR /pulsar/pulsar-client-go + +ENV GOPATH=/pulsar/go +ENV GOCACHE=/tmp/go-cache + +# Install dependencies +RUN go mod download + +# Basic compilation +RUN go build ./pulsar +RUN go build ./pulsaradmin +RUN go build -o bin/pulsar-perf ./perf diff --git a/Makefile b/Makefile index 62e44166..df4d539d 100644 --- a/Makefile +++ b/Makefile @@ -44,9 +44,17 @@ container: --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \ --build-arg ARCH="${CONTAINER_ARCH}" . -test: container +test: container test_standalone test_clustered + +test_standalone: container docker run -i ${IMAGE_NAME} bash -c
(pulsar-client-go) branch master updated: [fix][build] Build test container image using current hardware platform (#1205)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c3e94e24 [fix][build] Build test container image using current hardware platform (#1205) c3e94e24 is described below commit c3e94e243a730ae22d59bf9d330c4539733b7eef Author: Dragos Misca AuthorDate: Tue Apr 16 19:29:08 2024 -0700 [fix][build] Build test container image using current hardware platform (#1205) ### Motivation Test container image is built for `amd64` platforms only. This makes it difficult to test and run on others, notably Apple Silicon chips running `arm64`. ### Modifications The source of the problem is using the hardcoded `amd64` Go binary distribution during the image build. On Apple Silicon, running `make test` yields the following error: ``` ... docker run -i pulsar-client-go-test:latest bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" + export GOPATH=/pulsar/go + GOPATH=/pulsar/go + export GOCACHE=/tmp/go-cache + GOCACHE=/tmp/go-cache + go mod download rosetta error: failed to open elf at /lib64/ld-linux-x86-64.so.2 ./scripts/run-ci.sh: line 26: 7 Trace/breakpoint trap go mod download make: *** [test] Error 133 ``` Sourcing the local platform name from `uname -m` and using it to download the corresponding Go distribution solves the issue: ``` ... docker run -i pulsar-client-go-test:latest bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh" + export GOPATH=/pulsar/go + GOPATH=/pulsar/go + export GOCACHE=/tmp/go-cache + GOCACHE=/tmp/go-cache + go mod download + go build ./pulsar + go build -o bin/pulsar-perf ./perf + scripts/pulsar-test-service-start.sh ... ``` --- Dockerfile | 3 ++- Makefile | 7 +-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 62d22bf0..818a106a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,8 +21,9 @@ ARG PULSAR_IMAGE=apachepulsar/pulsar:latest FROM $PULSAR_IMAGE USER root ARG GO_VERSION=1.18 +ARG ARCH=amd64 -RUN curl -L https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz -o golang.tar.gz && \ +RUN curl -L https://dl.google.com/go/go${GO_VERSION}.linux-${ARCH}.tar.gz -o golang.tar.gz && \ mkdir -p /pulsar/go && tar -C /pulsar -xzf golang.tar.gz ENV PATH /pulsar/go/bin:$PATH diff --git a/Makefile b/Makefile index 527d53ac..62e44166 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,7 @@ IMAGE_NAME = pulsar-client-go-test:latest PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 +CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/) # Golang standard bin directory. GOPATH ?= $(shell go env GOPATH) @@ -38,8 +39,10 @@ bin/golangci-lint: GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 container: - docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="${GO_VERSION}" \ - --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" . + docker build -t ${IMAGE_NAME} \ + --build-arg GO_VERSION="${GO_VERSION}" \ + --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" \ + --build-arg ARCH="${CONTAINER_ARCH}" . test: container docker run -i ${IMAGE_NAME} bash -c "cd /pulsar/pulsar-client-go && ./scripts/run-ci.sh"
(pulsar-client-go) branch master updated: chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 393f80b4 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198) 393f80b4 is described below commit 393f80b4b93faa36936380b643426026a2b2cd02 Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> AuthorDate: Thu Mar 21 22:24:44 2024 +0800 chore(deps): bump google.golang.org/protobuf from 1.30.0 to 1.33.0 (#1198) Bumps google.golang.org/protobuf from 1.30.0 to 1.33.0. --- updated-dependencies: - dependency-name: google.golang.org/protobuf dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 14 ++ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 3257d510..51164871 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( golang.org/x/mod v0.8.0 golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 - google.golang.org/protobuf v1.30.0 + google.golang.org/protobuf v1.33.0 ) require ( diff --git a/go.sum b/go.sum index 50a1ba3e..0a768a0e 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,6 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= -golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -357,8 +355,6 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -420,13 +416,9 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a h1:ppl5mZgokTT8uPkmYOyEUmPTr3ypaKkg5eFOGrAmxxE= -golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -435,8 +427,6 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/te
(pulsar-client-go) branch master updated: [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 3693b369 [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199) 3693b369 is described below commit 3693b3695e2f072f9506b64b5e3000e5f107070d Author: jiangpengcheng AuthorDate: Thu Mar 21 15:39:08 2024 +0800 [fix] Change the wrong `SourceInstanceStatusData` in SinkInstanceStatus (#1199) --- pulsaradmin/pkg/utils/sink_status.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsaradmin/pkg/utils/sink_status.go b/pulsaradmin/pkg/utils/sink_status.go index 6cdb091f..a2651959 100644 --- a/pulsaradmin/pkg/utils/sink_status.go +++ b/pulsaradmin/pkg/utils/sink_status.go @@ -28,8 +28,8 @@ type SinkStatus struct { } type SinkInstanceStatus struct { - InstanceID int `json:"instanceId"` - Status SourceInstanceStatusData `json:"status"` + InstanceID int`json:"instanceId"` + Status SinkInstanceStatusData `json:"status"` } type SinkInstanceStatusData struct {
(pulsar-client-go) branch master updated: Add change log for 0.12.1 (#1189)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 3935fd7b Add change log for 0.12.1 (#1189) 3935fd7b is described below commit 3935fd7b1efe0afe7643fdcc5687559a4d00a40b Author: Zike Yang AuthorDate: Sat Mar 9 13:33:30 2024 +0800 Add change log for 0.12.1 (#1189) ### Motivation Add 0.12.1 change log --- CHANGELOG.md | 9 + 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a4c81cb..f25de209 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,15 @@ All notable changes to this project will be documented in this file. +[0.12.1] 2024-02-29 + +- [fix] Fix Infinite Loop in Reader's `HasNext` Function by @RobertIndie in [#1182](https://github.com/apache/pulsar-client-go/pull/1182) +- [fix] Fix available permits in MessageReceived by @panszobe in [#1181](https://github.com/apache/pulsar-client-go/pull/1181) +- [feat] Support partitioned topic reader by @RobertIndie in [#1178](https://github.com/apache/pulsar-client-go/pull/1178) +- [fix] Fix BytesSchema by @petermnhull in [#1173](https://github.com/apache/pulsar-client-go/pull/1173) +- [fix] Respect context cancellation in Flush by @jayshrivastava in [#1165](https://github.com/apache/pulsar-client-go/pull/1165) +- [fix] Fix SIGSEGV with zstd compression enabled by @RobertIndie in [#1164](https://github.com/apache/pulsar-client-go/pull/1164) + [0.12.0] 2024-01-10 ## What's Changed
(pulsar-client-go) branch master updated: [Improve]Change base image to apachepulsar/pulsar (#1195)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 4f213796 [Improve]Change base image to apachepulsar/pulsar (#1195) 4f213796 is described below commit 4f213796979cdf6ffb3477f595b25c4544d76811 Author: crossoverJie AuthorDate: Fri Mar 8 23:53:30 2024 +0800 [Improve]Change base image to apachepulsar/pulsar (#1195) ### Motivation There are related discussion records [here](https://github.com/apache/pulsar-client-go/pull/1037). It is recommended to switch the base image to `apachepulsar/pulsar`. ### Modifications Change base image to `apachepulsar/pulsar`. --- .github/workflows/ci.yml | 2 +- Dockerfile | 13 - Makefile | 3 +-- scripts/run-ci.sh| 3 ++- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9ad0b3f2..7b770051 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: -go-version: ['1.18', '1.19', '1.20', '1.21'] +go-version: ['1.18', '1.19', '1.20', '1.21.0'] steps: - uses: actions/checkout@v3 - name: clean docker cache diff --git a/Dockerfile b/Dockerfile index 1c137fc0..62d22bf0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,14 +18,17 @@ # Explicit version of Pulsar and Golang images should be # set via the Makefile or CLI ARG PULSAR_IMAGE=apachepulsar/pulsar:latest -ARG GOLANG_IMAGE=golang:latest +FROM $PULSAR_IMAGE +USER root +ARG GO_VERSION=1.18 -FROM $PULSAR_IMAGE as pulsar -FROM $GOLANG_IMAGE +RUN curl -L https://dl.google.com/go/go${GO_VERSION}.linux-amd64.tar.gz -o golang.tar.gz && \ +mkdir -p /pulsar/go && tar -C /pulsar -xzf golang.tar.gz -RUN apt-get update && apt-get install -y openjdk-17-jre ca-certificates +ENV PATH /pulsar/go/bin:$PATH + +RUN apt-get update && apt-get install -y git && apt-get install -y gcc -COPY --from=pulsar /pulsar /pulsar ### Add pulsar config COPY integration-tests/certs /pulsar/certs diff --git a/Makefile b/Makefile index d0442372..527d53ac 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,6 @@ IMAGE_NAME = pulsar-client-go-test:latest PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 -GOLANG_IMAGE = golang:$(GO_VERSION) # Golang standard bin directory. GOPATH ?= $(shell go env GOPATH) @@ -39,7 +38,7 @@ bin/golangci-lint: GOBIN=$(shell pwd)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.51.2 container: - docker build -t ${IMAGE_NAME} --build-arg GOLANG_IMAGE="${GOLANG_IMAGE}" \ + docker build -t ${IMAGE_NAME} --build-arg GO_VERSION="${GO_VERSION}" \ --build-arg PULSAR_IMAGE="${PULSAR_IMAGE}" . test: container diff --git a/scripts/run-ci.sh b/scripts/run-ci.sh index cc4f6a1e..83246a39 100755 --- a/scripts/run-ci.sh +++ b/scripts/run-ci.sh @@ -19,7 +19,8 @@ set -e -x -export GOPATH=/ +export GOPATH=/pulsar/go +export GOCACHE=/tmp/go-cache # Install dependencies go mod download
(pulsar-client-go) branch master updated: [Improve] getMessagesById gets all messages (#1194)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 143fa232 [Improve] getMessagesById gets all messages (#1194) 143fa232 is described below commit 143fa23228621414784cefae347286e797e4ef84 Author: crossoverJie AuthorDate: Fri Mar 8 23:52:51 2024 +0800 [Improve] getMessagesById gets all messages (#1194) ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/21918 ### Modifications Add `getMessagesById` interface. - Co-authored-by: Zike Yang --- pulsaradmin/pkg/admin/subscription.go | 23 +++-- pulsaradmin/pkg/admin/subscription_test.go | 130 + 2 files changed, 147 insertions(+), 6 deletions(-) diff --git a/pulsaradmin/pkg/admin/subscription.go b/pulsaradmin/pkg/admin/subscription.go index 456de46c..8ddb5845 100644 --- a/pulsaradmin/pkg/admin/subscription.go +++ b/pulsaradmin/pkg/admin/subscription.go @@ -72,8 +72,11 @@ type Subscriptions interface { // PeekMessages peeks messages from a topic subscription PeekMessages(utils.TopicName, string, int) ([]*utils.Message, error) - // GetMessageByID gets message by its ledgerID and entryID + // Deprecated: Use GetMessagesByID() instead GetMessageByID(topic utils.TopicName, ledgerID, entryID int64) (*utils.Message, error) + + // GetMessagesByID gets messages by its ledgerID and entryID + GetMessagesByID(topic utils.TopicName, ledgerID, entryID int64) ([]*utils.Message, error) } type subscriptions struct { @@ -187,6 +190,18 @@ func (s *subscriptions) peekNthMessage(topic utils.TopicName, sName string, pos } func (s *subscriptions) GetMessageByID(topic utils.TopicName, ledgerID, entryID int64) (*utils.Message, error) { + messages, err := s.GetMessagesByID(topic, ledgerID, entryID) + if err != nil { + return nil, err + } + + if len(messages) == 0 { + return nil, nil + } + return messages[0], nil +} + +func (s *subscriptions) GetMessagesByID(topic utils.TopicName, ledgerID, entryID int64) ([]*utils.Message, error) { ledgerIDStr := strconv.FormatInt(ledgerID, 10) entryIDStr := strconv.FormatInt(entryID, 10) @@ -201,11 +216,7 @@ func (s *subscriptions) GetMessageByID(topic utils.TopicName, ledgerID, entryID if err != nil { return nil, err } - - if len(messages) == 0 { - return nil, nil - } - return messages[0], nil + return messages, nil } // safeRespClose is used to close a response body diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go new file mode 100644 index ..c4ba717d --- /dev/null +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/stretchr/testify/assert" +) + +func TestGetMessagesByID(t *testing.T) { + + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + ctx := context.Background() + + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + // create producer + numberMessages := 10 + batchingMaxMessages := numberMessages / 2 + producer, err := c
(pulsar-client-go) annotated tag v0.12.1 updated (a029f2d7 -> ecd9a20e)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.12.1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.12.1 was modified! *** from a029f2d7 (commit) to ecd9a20e (tag) tagging a029f2d7e392fa37511ac44c3276832705ded08b (commit) replaces v0.12.0 by Zike Yang on Fri Mar 8 18:20:03 2024 +0800 - Log - Release v0.12.1 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmXq5lMQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VWuEEACyhzqyz5x0zc/7wG4Frlt9Z/Z0qt4vzZDs tnv3BSNLqop571IN7PdOyk0oBKiC+t39PJXfByEtObQTdiQaazdhcDv6POKchCuW eCI65CxfFRd0c9DyauR2lRULi+NFdOOzkejZFv27iAnTEe5CFY8OknEoIXcbCAWV XjPVXwFm05hWupxN+38FdTRBA6oCeGn7aNHgFRVJ6M/1wnwxLp9Ph4C+IJZ45aq9 1PpTiCjcxjKYAt0iW6ukXUrPMwREI2wjOBXm+hceCAk61euoXsVgqhomUizea2hK je5X1YaeUJ1VfOtqCjA1FUhzEJ2CKTDYSWflbKistKfKU2USv8ZWLyivja4kkQW6 A1Q5zxgxY+3kTLDZm4jJ1m9O6TVezujEAkX4C9L56mBw3A0quhPjaNYXDgHyNTyX 4SUa3DzJDxAYKxjefhSBNtAh85ho72dj6BjKMLMZ1LLcHYFcAD8tyw0dKfiSYY2n mC2JPc7oMPxnVThF9S8XepUzfAz2q+jgmpjYUM7YKYrbq3FTGYfOtq3wg5O/UXEs 3yUHlDsi33Rlp4qm7krt/HjmiqXg/MCtJU5x0a+LYaNQ4VkPB7W35q0JHF/KkARL GlBuuterPWnqC8ljDjbKW7ONx/AupPhxsyVLb5nFFuaQ5wetjRXwANLGskCIBuzY yErF39RvIw== =Xy+K -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch master updated: [Improve] Add optional parameters for getPartitionedStats (#1193)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c4f47abd [Improve] Add optional parameters for getPartitionedStats (#1193) c4f47abd is described below commit c4f47abd7c82402e7d4e95b55b24ebfef4bdb029 Author: crossoverJie AuthorDate: Tue Mar 5 18:40:40 2024 +0800 [Improve] Add optional parameters for getPartitionedStats (#1193) ### Motivation To keep consistent with the Java client. Releted PR: https://github.com/apache/pulsar/pull/21611 ### Modifications Add `GetStatsOptions` params. --- Makefile| 2 +- pulsar/consumer_test.go | 12 +-- pulsaradmin/pkg/admin/topic.go | 34 pulsaradmin/pkg/admin/topic_test.go | 160 pulsaradmin/pkg/utils/data.go | 8 ++ 5 files changed, 209 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 4eb590b0..d0442372 100644 --- a/Makefile +++ b/Makefile @@ -18,7 +18,7 @@ # IMAGE_NAME = pulsar-client-go-test:latest -PULSAR_VERSION ?= 2.10.3 +PULSAR_VERSION ?= 3.2.0 PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION) GO_VERSION ?= 1.18 GOLANG_IMAGE = golang:$(GO_VERSION) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index d66e2376..4a3b532d 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -2219,6 +2219,12 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.Nil(t, err) defer producer.Close() + // Increase number of partitions to 10 + makeHTTPCall(t, http.MethodPost, testURL, "10") + + // Wait for the producer/consumers to pick up the change + time.Sleep(1 * time.Second) + consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName:"my-sub", @@ -2227,12 +2233,6 @@ func TestConsumerAddTopicPartitions(t *testing.T) { assert.Nil(t, err) defer consumer.Close() - // Increase number of partitions to 10 - makeHTTPCall(t, http.MethodPost, testURL, "10") - - // Wait for the producer/consumers to pick up the change - time.Sleep(1 * time.Second) - // Publish messages ensuring that they will go to all the partitions ctx := context.Background() for i := 0; i < 10; i++ { diff --git a/pulsaradmin/pkg/admin/topic.go b/pulsaradmin/pkg/admin/topic.go index c27b..e6057413 100644 --- a/pulsaradmin/pkg/admin/topic.go +++ b/pulsaradmin/pkg/admin/topic.go @@ -75,6 +75,9 @@ type Topics interface { // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetStats(utils.TopicName) (utils.TopicStats, error) + // GetStatsWithOption returns the stats for the topic + GetStatsWithOption(utils.TopicName, utils.GetStatsOptions) (utils.TopicStats, error) + // GetInternalStats returns the internal stats for the topic. GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error) @@ -82,6 +85,9 @@ type Topics interface { // All the rates are computed over a 1 minute window and are relative the last completed 1 minute period GetPartitionedStats(utils.TopicName, bool) (utils.PartitionedTopicStats, error) + // GetPartitionedStatsWithOption returns the stats for the partitioned topic + GetPartitionedStatsWithOption(utils.TopicName, bool, utils.GetStatsOptions) (utils.PartitionedTopicStats, error) + // Terminate the topic and prevent any more messages being published on it Terminate(utils.TopicName) (utils.MessageID, error) @@ -395,6 +401,19 @@ func (t *topics) GetStats(topic utils.TopicName) (utils.TopicStats, error) { err := t.pulsar.Client.Get(endpoint, &stats) return stats, err } +func (t *topics) GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error) { + var stats utils.TopicStats + endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats") + params := map[string]string{ + "getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog), + "subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize), + "getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog), + "excludePublishers": strconv.FormatBool(option.ExcludePublishers), + "excludeConsumers": strconv.FormatBool(option.ExcludeConsumers), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, true) + return stats, er
svn commit: r67630 - in /dev/pulsar/pulsar-client-go-0.12.1-candidate-1: ./ apache-pulsar-client-go-0.12.1-src.tar.gz apache-pulsar-client-go-0.12.1-src.tar.gz.asc apache-pulsar-client-go-0.12.1-src.t
Author: zike Date: Thu Feb 29 02:23:04 2024 New Revision: 67630 Log: Staging artifacts and signature for Pulsar Client Go release 0.12.1-candidate-1 Added: dev/pulsar/pulsar-client-go-0.12.1-candidate-1/ dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.asc Thu Feb 29 02:23:04 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmXf6e8ACgkQT0AbyNP5 ++1XIWBAAkpJFKZ+Hjtx8c7bafVbsCOt+HsjAEU6KDuNJVMvlYPwlTizgpMW0nezf +529Er2VY0ofrTu+m5qbHlpodNr+bKAQ/lS9VYx7tmKFgwUlEmP4+txYw9ggl/aEd +Xi95vMc2CmHvp2tU8n9wu/42oCOnraPJoS9AW7MugkwSQJdjzsVWuTr6iYsSjcou +OST7ZTNbqdmFrQr7fxO6f0mFm2hV6LNDHQ0ZXRheN/dmj23bXG/VSpR4I5R4gphC +2kz7Lw/YrMKE4NdLx+SRO00qfLVNuJ+FFzl5rnIiWFl6ezz4Czz91K9p1MKCngYU +5EIcmZF7WUloqNTbUI8i5UUm2756UcpDCuT3ffk9g11VeN4r0TzzOayRujnWckxF +5Fsuk+v3ZtVKCNMhU3B0EZTtqs6vYVoJHKynuVa3LuzvEI4QWMCj7pCKMLBybFfH +vQHCrZgd2b9491eqNfUnfpLlWqVHqXUvEbtLl3Suh8FW6rz8SDILD3uN0CcJrFS4 +dLosHCAsCXcugwecLmcpe9Jz8VAkxhQYkNcwXENyM8EFDC3V9kWIRug6AVeZrcC1 +tp3VP6fW/XNvY4+I55zPqO1i6eg2BRnv60YHkpjJGB8rc+oazIQLJWFQ3ib+TKS6 +nrWlar+apGfj8Y3HMqNtjP6i4K9+eYj3BwzN491+EyyzaqEOQrk= +=CSb1 +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.12.1-candidate-1/apache-pulsar-client-go-0.12.1-src.tar.gz.sha512 Thu Feb 29 02:23:04 2024 @@ -0,0 +1 @@ +691a301d099a602baa30bb7ec22dc33b8503d5ad5aa5fe43f51aab188099e4781844070de11584fd82d40227a28fe763ed8bd2cfa93b99d73ebfd34c6061e02d apache-pulsar-client-go-0.12.1-src.tar.gz
(pulsar-client-go) annotated tag v0.12.1-candidate-1 updated (a029f2d7 -> 98679f15)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.12.1-candidate-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.12.1-candidate-1 was modified! *** from a029f2d7 (commit) to 98679f15 (tag) tagging a029f2d7e392fa37511ac44c3276832705ded08b (commit) replaces v0.12.0 by Zike Yang on Thu Feb 29 02:20:01 2024 + - Log - Release v0.12.1-candidate-1 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmXf6dEQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VdZmD/4rrNIu0d0HDfWdljYcprV9FLvq0MxcJ94+ KJRE6rF3kQJ6xusKvSQigM4r0tb4mp8QSuAOcfboYWDMZLmSyodCdUMv56b+HZwJ LJ+joFwrtl2qzq0dmn4gW8QB9pnUDh9z0vkZpULYqX7N8XB/fzanfr1MXT0f27aW uPKFQ55gEhJpZuPIsGVxtCXgeLA3mKamLnqtGQTznvh9VORQ9mr/U+wnj/f6VzwF 6WtJINt0DRrjyxjNL+V73IXJgDHojVnSS02TfhM19bVEv2vzoR95LMOrcEFFWuxJ Liewwfw8Sc2JSqZjZWBDFcGda3SickDNA1KO1atzPVIIT57nuNioijveufQCPBQV 6XL9RTdT4e2UCtq2VqNO1Z4Fmok1SAx+6xgyK3osVfF0LkS+85xf4zo6ovRkaHzT 2uaOH5wh0QZBB/CQ+bLkr8eQ2T9+AQnlDb4MfEJVHIpXGwbumqmhkNSZmgmJw194 W1jx/Jn+jLPSgOcSWb7HElibxBpHiPkRZ/46vz3ROGdRa6laVMTI+G9I3hXvTjvQ XHOB38ylY+r3C/rDOac01zxmB4VbHemvOu42jv58QmHIX1sMZBNLNvnkt6FSzB+z 6qgYS1qGQvOPsBU9cYPYONH8ZqwUSo3U87Yz84OtOx3tN9ns0G03WenpXdzsxrZy ge+xibNGDA== =qTUq -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch branch-0.12.0 updated: Release v0.12.1
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.12.0 by this push: new a029f2d7 Release v0.12.1 a029f2d7 is described below commit a029f2d7e392fa37511ac44c3276832705ded08b Author: Zike Yang AuthorDate: Thu Feb 29 02:19:38 2024 + Release v0.12.1 --- VERSION| 2 +- stable.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index 725659e7..5f237359 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.12.0 +v0.12.1 diff --git a/stable.txt b/stable.txt index e0916329..6027c033 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.12.0 +v0.12.1
(pulsar-client-go) 04/05: [Fix] Fix available permits in MessageReceived (#1181)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 9fdefe2bbd2ecdd7c9f853580e829cd66f22b78e Author: Jinjun Pan <75996911+pansz...@users.noreply.github.com> AuthorDate: Fri Feb 23 21:47:37 2024 +0800 [Fix] Fix available permits in MessageReceived (#1181) Fixes #1180 ### Motivation In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent. - Co-authored-by: panjinjun <1619-panjin...@users.noreply.git.sysop.bigo.sg> (cherry picked from commit 5d258272cb83444fe156dcbb57cbf8f2d475a50b) --- pulsar/consumer_partition.go | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 95b5bc09..3572a522 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1093,7 +1093,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.metrics.MessagesReceived.Add(float64(numMsgs)) pc.metrics.PrefetchedMessages.Add(float64(numMsgs)) - var bytesReceived int + var ( + bytesReceived int + skippedMessages int32 + ) for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil || payload == nil { @@ -1102,6 +1105,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if ackSet != nil && !ackSet.Test(uint(i)) { pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i) + skippedMessages++ continue } @@ -1120,6 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if pc.messageShouldBeDiscarded(trackingMsgID) { pc.AckID(trackingMsgID) + skippedMessages++ continue } @@ -1144,6 +1149,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if pc.ackGroupingTracker.isDuplicate(msgID) { + skippedMessages++ continue } @@ -1218,6 +1224,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } + if skippedMessages > 0 { + pc.availablePermits.add(skippedMessages) + } + // send messages to the dispatcher pc.queueCh <- messages return nil
(pulsar-client-go) 01/05: [Producer] respect context cancellation in Flush (#1165)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 436188c9014786a22db8eb532e7d7927c5f4e080 Author: Jayant AuthorDate: Thu Feb 1 21:52:52 2024 -0500 [Producer] respect context cancellation in Flush (#1165) ### Motivation The producer's `Flush` method does not respect context cancellation. If the caller's context get's cancelled, it will have to wait for the producer to finish flushing. ### Modifications This change adds a `FlushWithCtx` method which takes a context and selects on two channels. (cherry picked from commit 2a28e21c59d005515e118fed5bf8f333d6699e39) --- pulsar/consumer_test.go| 6 +++--- .../pulsartracing/producer_interceptor_test.go | 4 pulsar/producer.go | 7 +-- pulsar/producer_impl.go| 6 +- pulsar/producer_partition.go | 18 +--- pulsar/producer_test.go| 24 +++--- pulsar/reader_test.go | 6 +++--- 7 files changed, 47 insertions(+), 24 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index df70b0dd..d66e2376 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { } wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // send another batch @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { }, nil) } - producer.Flush() + producer.FlushWithCtx(context.Background()) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) @@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) }) } - assert.Nil(t, producer.Flush()) + assert.Nil(t, producer.FlushWithCtx(context.Background())) msgIds := make([]MessageID, BatchingMaxSize) for i := 0; i < BatchingMaxSize; i++ { diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 8d8e6965..1c2c712f 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error { return nil } +func (p *mockProducer) FlushWithCtx(ctx context.Context) error { + return nil +} + func (p *mockProducer) Close() {} diff --git a/pulsar/producer.go b/pulsar/producer.go index 70d152c7..f8013a16 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -237,10 +237,13 @@ type Producer interface { // return the last sequence id published by this producer. LastSequenceID() int64 - // Flush all the messages buffered in the client and wait until all messages have been successfully - // persisted. + // Deprecated: Use `FlushWithCtx()` instead. Flush() error + // Flush all the messages buffered in the client and wait until all messageshave been successfully + // persisted. + FlushWithCtx(ctx context.Context) error + // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597..ca923108 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 { } func (p *producer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *producer) FlushWithCtx(ctx context.Context) error { p.RLock() defer p.RUnlock() for _, pp := range p.producers { - if err := pp.Flush(); err != nil { + if err := pp.FlushWithCtx(ctx); err != nil { return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e..fbcc5b97 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 { } func (p *partitionProducer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { flushReq := &flushRequest{ doneCh: make(
(pulsar-client-go) 05/05: [fix] Fix Infinite Loop in Reader's `HasNext` Function (#1182)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit a5f06793bd4365012641a79825267a51d4ce43c9 Author: Zike Yang AuthorDate: Wed Feb 28 18:39:00 2024 +0800 [fix] Fix Infinite Loop in Reader's `HasNext` Function (#1182) Fixes #1171 ### Motivation If `getLastMessageId` continually fails, the reader.HasNext can get stuck in an infinite loop. Without any backoff, the reader would keep trying forever. ### Modifications - Implemented a backoff policy for `getLastMessageID`. - If HasNext fails, it now returns false. Should the reader.HasNext returned `false` in case of failure? Currently, the `HasNext` method doesn't report errors. However, failure is still possible. For instance, if `getLastMessageID` repeatedly fails and hits the retry limit. An option is to keep trying forever, but this would stall all user code. This isn't user-friendly, so I rejected this solution. Couldn't utilize the BackOffPolicy in the Reader Options The `HasNext` retry mechanism requires to use of `IsMaxBackoffReached` for the backoff. But it isn't exposed in the `BackOffPolicy` interface. Introducing a new method to the `BackOffPolicy` would introduce breaking changes for the user backoff implementation. So, I choose not to implement it. Before we do it, we need to refine the BackOffPolicy. (cherry picked from commit 88a8d85cf6d6a4f282a5b39a2140a7bb06ba0f3b) --- pulsar/client_impl.go| 24 + pulsar/consumer_partition.go | 53 +--- pulsar/reader.go | 1 + pulsar/reader_test.go| 64 4 files changed, 115 insertions(+), 27 deletions(-) diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go index 7daf6f62..65aed3b9 100644 --- a/pulsar/client_impl.go +++ b/pulsar/client_impl.go @@ -40,14 +40,15 @@ const ( ) type client struct { - cnxPool internal.ConnectionPool - rpcClient internal.RPCClient - handlers internal.ClientHandlers - lookupService internal.LookupService - metrics *internal.Metrics - tcClient *transactionCoordinatorClient - memLimit internal.MemoryLimitController - closeOnce sync.Once + cnxPool internal.ConnectionPool + rpcClientinternal.RPCClient + handlers internal.ClientHandlers + lookupServiceinternal.LookupService + metrics *internal.Metrics + tcClient *transactionCoordinatorClient + memLimit internal.MemoryLimitController + closeOncesync.Once + operationTimeout time.Duration log log.Logger } @@ -161,9 +162,10 @@ func newClient(options ClientOptions) (Client, error) { c := &client{ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval, maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime), - log: logger, - metrics: metrics, - memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + log: logger, + metrics: metrics, + memLimit: internal.NewMemoryLimitController(memLimitBytes, defaultMemoryLimitTriggerThreshold), + operationTimeout: operationTimeout, } serviceNameResolver := internal.NewPulsarServiceNameResolver(url) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 3572a522..162565b2 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -570,15 +570,41 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { func (pc *partitionConsumer) getLastMessageID() (*trackingMessageID, error) { if state := pc.getConsumerState(); state == consumerClosed || state == consumerClosing { - pc.log.WithField("state", state).Error("Failed to redeliver closing or closed consumer") - return nil, errors.New("failed to redeliver closing or closed consumer") + pc.log.WithField("state", state).Error("Failed to getLastMessageID for the closing or closed consumer") + return nil, errors.New("failed to getLastMessageID for the closing or closed consumer") } - req := &getLastMsgIDRequest{doneCh: make(chan struct{})} - pc.eventsCh <- req + remainTime := pc.client.operationTimeout + var backoff internal.BackoffPolicy + if pc.options.backoffPolicy != nil { + backof
(pulsar-client-go) 03/05: [feat] Support partitioned topic reader (#1178)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 6ce5421b07e23ff252ba5aaa754e1a0a5bab4cc5 Author: Zike Yang AuthorDate: Fri Feb 23 15:49:08 2024 +0800 [feat] Support partitioned topic reader (#1178) Master Issue: #1177 ### Motivation Currently, there is an issue with the reader implementation. If the reader is creating, it won't get the topic metadata from the topic. The reader can only read messages from a single topic. If the topic is a partitioned topic, the reader won't know that and will try to create a non-partition topic with the same name. And it will lead to this issue: https://github.com/apache/pulsar/issues/22032 ### Modifications - Support partitioned topic reader (cherry picked from commit 3b9b1f8895d8924ec98db4612806b9871f1d135b) --- pulsar/consumer.go | 7 +++ pulsar/consumer_impl.go | 47 +- pulsar/consumer_partition.go | 36 ++ pulsar/reader.go | 1 + pulsar/reader_impl.go| 112 ++- pulsar/reader_test.go| 100 ++ 6 files changed, 227 insertions(+), 76 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 667bff66..fea94cf6 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -246,6 +246,13 @@ type ConsumerOptions struct { // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode + + // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. + // Default is `false` and the consumer will start from the "next" message + StartMessageIDInclusive bool + + // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. + startMessageID *trackingMessageID } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 75d839b4..0c31a1aa 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -384,7 +384,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata:metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: c.options.startMessageID, + startMessageIDInclusive: c.options.StartMessageIDInclusive, subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, @@ -707,6 +708,50 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { return nil } +func (c *consumer) hasNext() bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure all paths cancel the context to avoid context leak + + var wg sync.WaitGroup + wg.Add(len(c.consumers)) + + hasNext := make(chan bool) + for _, pc := range c.consumers { + pc := pc + go func() { + defer wg.Done() + if pc.hasNext() { + select { + case hasNext <- true: + case <-ctx.Done(): + } + } + }() + } + + go func() { + wg.Wait() + close(hasNext) // Close the channel after all goroutines have finished + }() + + // Wait for either a 'true' result or for all goroutines to finish + for hn := range hasNext { + if hn { + return true + } + } + + return false +} + +func (c *consumer) setLastDequeuedMsg(msgID MessageID) error { + if err := c.checkMsgIDPartition(msgID); err != nil { + return err + } + c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID) + return nil +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1..95b5bc09 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -174,6 +174,8 @@ type p
(pulsar-client-go) 02/05: [Fix] Fix Bytes Schema (#1173)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git commit 68cd09a96e82a44e11fe92b050c9674143f1e5cc Author: Peter Hull <56369394+petermnh...@users.noreply.github.com> AuthorDate: Tue Feb 20 02:31:00 2024 + [Fix] Fix Bytes Schema (#1173) (cherry picked from commit c2ca7e81f0c609cd8fb7b13695664519e63e4501) --- pulsar/schema.go | 2 ++ pulsar/schema_test.go | 64 +++ pulsar/table_view_test.go | 7 ++ 3 files changed, 73 insertions(+) diff --git a/pulsar/schema.go b/pulsar/schema.go index fd9d412d..3427fb26 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s var schemaDef = string(schemaData) var s Schema switch schemaType { + case BYTES: + s = NewBytesSchema(properties) case STRING: s = NewStringSchema(properties) case JSON: diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index c2008f6d..34216c47 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -19,11 +19,14 @@ package pulsar import ( "context" + "fmt" "log" "testing" + "time" pb "github.com/apache/pulsar-client-go/integration-tests/pb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testJSON struct { @@ -55,6 +58,67 @@ func createClient() Client { return client } +func TestBytesSchema(t *testing.T) { + client := createClient() + defer client.Close() + + topic := newTopicName() + + properties := make(map[string]string) + properties["pulsar"] = "hello" + producerSchemaBytes := NewBytesSchema(properties) + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: producerSchemaBytes, + }) + assert.NoError(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`{"key": "value"}`), + }) + require.NoError(t, err) + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`something else`), + }) + require.NoError(t, err) + producer.Close() + + // Create consumer + consumerSchemaBytes := NewBytesSchema(nil) + assert.NotNil(t, consumerSchemaBytes) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName:"sub-1", + Schema: consumerSchemaBytes, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Receive first message + var out1 []byte + msg1, err := consumer.Receive(ctx) + assert.NoError(t, err) + err = msg1.GetSchemaValue(&out1) + assert.NoError(t, err) + assert.Equal(t, []byte(`{"key": "value"}`), out1) + consumer.Ack(msg1) + require.NoError(t, err) + + // Receive second message + var out2 []byte + msg2, err := consumer.Receive(ctx) + fmt.Println(string(msg2.Payload())) + assert.NoError(t, err) + err = msg2.GetSchemaValue(&out2) + assert.NoError(t, err) + assert.Equal(t, []byte(`something else`), out2) + + defer consumer.Close() +} + func TestJsonSchema(t *testing.T) { client := createClient() defer client.Close() diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 45b94411..2368e3d8 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) { expValueOut interface{} valueCheckfunc(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks }{ + { + name: "BytesSchema", + schema:NewBytesSchema(nil), + schemaType:[]byte(`any`), + producerValue: []byte(`hello pulsar`), + expValueOut: []byte(`hello pulsar`), + }, { name: "StringSchema", schema:NewStringSchema(nil),
(pulsar-client-go) branch branch-0.12.0 updated (0f0d5a86 -> a5f06793)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 0f0d5a86 Fix SIGSEGV with zstd compression enabled (#1164) new 436188c9 [Producer] respect context cancellation in Flush (#1165) new 68cd09a9 [Fix] Fix Bytes Schema (#1173) new 6ce5421b [feat] Support partitioned topic reader (#1178) new 9fdefe2b [Fix] Fix available permits in MessageReceived (#1181) new a5f06793 [fix] Fix Infinite Loop in Reader's `HasNext` Function (#1182) The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: pulsar/client_impl.go | 24 +-- pulsar/consumer.go | 7 + pulsar/consumer_impl.go| 47 +- pulsar/consumer_partition.go | 83 +- pulsar/consumer_test.go| 6 +- .../pulsartracing/producer_interceptor_test.go | 4 + pulsar/producer.go | 7 +- pulsar/producer_impl.go| 6 +- pulsar/producer_partition.go | 18 ++- pulsar/producer_test.go| 24 +-- pulsar/reader.go | 2 + pulsar/reader_impl.go | 112 ++ pulsar/reader_test.go | 170 +++-- pulsar/schema.go | 2 + pulsar/schema_test.go | 64 pulsar/table_view_test.go | 7 + 16 files changed, 464 insertions(+), 119 deletions(-)
(pulsar-client-go) branch master updated: [Fix] Fix available permits in MessageReceived (#1181)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 5d258272 [Fix] Fix available permits in MessageReceived (#1181) 5d258272 is described below commit 5d258272cb83444fe156dcbb57cbf8f2d475a50b Author: Jinjun Pan <75996911+pansz...@users.noreply.github.com> AuthorDate: Fri Feb 23 21:47:37 2024 +0800 [Fix] Fix available permits in MessageReceived (#1181) Fixes #1180 ### Motivation In the `MessageReceived`, the number of skipped messages should be increased to available permits to avoid skipped permits leading flow request not be sent. - Co-authored-by: panjinjun <1619-panjin...@users.noreply.git.sysop.bigo.sg> --- pulsar/consumer_partition.go | 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 95b5bc09..3572a522 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1093,7 +1093,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.metrics.MessagesReceived.Add(float64(numMsgs)) pc.metrics.PrefetchedMessages.Add(float64(numMsgs)) - var bytesReceived int + var ( + bytesReceived int + skippedMessages int32 + ) for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil || payload == nil { @@ -1102,6 +1105,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if ackSet != nil && !ackSet.Test(uint(i)) { pc.log.Debugf("Ignoring message from %vth message, which has been acknowledged", i) + skippedMessages++ continue } @@ -1120,6 +1124,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if pc.messageShouldBeDiscarded(trackingMsgID) { pc.AckID(trackingMsgID) + skippedMessages++ continue } @@ -1144,6 +1149,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header } if pc.ackGroupingTracker.isDuplicate(msgID) { + skippedMessages++ continue } @@ -1218,6 +1224,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header pc.markScaleIfNeed() } + if skippedMessages > 0 { + pc.availablePermits.add(skippedMessages) + } + // send messages to the dispatcher pc.queueCh <- messages return nil
(pulsar-client-go) branch master updated: [feat] Support partitioned topic reader (#1178)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 3b9b1f88 [feat] Support partitioned topic reader (#1178) 3b9b1f88 is described below commit 3b9b1f8895d8924ec98db4612806b9871f1d135b Author: Zike Yang AuthorDate: Fri Feb 23 15:49:08 2024 +0800 [feat] Support partitioned topic reader (#1178) Master Issue: #1177 ### Motivation Currently, there is an issue with the reader implementation. If the reader is creating, it won't get the topic metadata from the topic. The reader can only read messages from a single topic. If the topic is a partitioned topic, the reader won't know that and will try to create a non-partition topic with the same name. And it will lead to this issue: https://github.com/apache/pulsar/issues/22032 ### Modifications - Support partitioned topic reader --- pulsar/consumer.go | 7 +++ pulsar/consumer_impl.go | 47 +- pulsar/consumer_partition.go | 36 ++ pulsar/reader.go | 1 + pulsar/reader_impl.go| 112 ++- pulsar/reader_test.go| 100 ++ 6 files changed, 227 insertions(+), 76 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 667bff66..fea94cf6 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -246,6 +246,13 @@ type ConsumerOptions struct { // SubscriptionMode specifies the subscription mode to be used when subscribing to a topic. // Default is `Durable` SubscriptionMode SubscriptionMode + + // StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included. + // Default is `false` and the consumer will start from the "next" message + StartMessageIDInclusive bool + + // startMessageID specifies the message id to start from. Currently, it's only used for the reader internally. + startMessageID *trackingMessageID } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 75d839b4..0c31a1aa 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -384,7 +384,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { metadata:metadata, subProperties: subProperties, replicateSubscriptionState: c.options.ReplicateSubscriptionState, - startMessageID: nil, + startMessageID: c.options.startMessageID, + startMessageIDInclusive: c.options.StartMessageIDInclusive, subscriptionMode: c.options.SubscriptionMode, readCompacted: c.options.ReadCompacted, interceptors: c.options.Interceptors, @@ -707,6 +708,50 @@ func (c *consumer) checkMsgIDPartition(msgID MessageID) error { return nil } +func (c *consumer) hasNext() bool { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure all paths cancel the context to avoid context leak + + var wg sync.WaitGroup + wg.Add(len(c.consumers)) + + hasNext := make(chan bool) + for _, pc := range c.consumers { + pc := pc + go func() { + defer wg.Done() + if pc.hasNext() { + select { + case hasNext <- true: + case <-ctx.Done(): + } + } + }() + } + + go func() { + wg.Wait() + close(hasNext) // Close the channel after all goroutines have finished + }() + + // Wait for either a 'true' result or for all goroutines to finish + for hn := range hasNext { + if hn { + return true + } + } + + return false +} + +func (c *consumer) setLastDequeuedMsg(msgID MessageID) error { + if err := c.checkMsgIDPartition(msgID); err != nil { + return err + } + c.consumers[msgID.PartitionIdx()].lastDequeuedMsg = toTrackingMessageID(msgID) + return nil +} + var r = &random{ R: rand.New(rand.NewSource(time.Now().UnixNano())), } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index fd6441c1..95b5bc09 100644 --- a/pulsar/consum
(pulsar-site) branch main updated: [fix] Update release docker image in release process (#745)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-site.git The following commit(s) were added to refs/heads/main by this push: new af2fdb6a147c [fix] Update release docker image in release process (#745) af2fdb6a147c is described below commit af2fdb6a147c2afa53f921c7a158a282cd8e1fea Author: Zike Yang AuthorDate: Tue Feb 20 21:25:14 2024 +0800 [fix] Update release docker image in release process (#745) PIP: https://github.com/apache/pulsar/pull/21872 The current release docker images process doesn't work for Pulsar above versions 3.0.0. Pulsar has added arm and amd arch supports for the docker image. If we use the original command to push the image, it will push only one arch. We recommend use tools like regctl to push images. For the latest tag of the pulsar image, this PR proposes to use the last feature release version or the patch release of the last feature release as the `latest` tag. - Co-authored-by: Penghui Li --- contribute/release-process.md | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index 12a25fb74700..1583bf56d270 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -319,20 +319,20 @@ Promote the Maven staging repository for release. Login to `https://repository.a ### Release Docker images -Copy the approved candidate docker images from your personal account to apachepulsar org. +Please ensure that the regctl tools have been properly installed. They can be obtained from the following link: https://github.com/regclient/regclient/blob/main/docs/install.md + +Copy the approved candidate Docker images from your personal account to the apachepulsar organization: ```bash -PULSAR_VERSION=2.x.x +PULSAR_VERSION=3.x.x OTHER_DOCKER_USER=otheruser -for image in pulsar pulsar-all pulsar-grafana pulsar-standalone; do -docker pull "${OTHER_DOCKER_USER}/$image:${PULSAR_VERSION}" && { - docker tag "${OTHER_DOCKER_USER}/$image:${PULSAR_VERSION}" "apachepulsar/$image:${PULSAR_VERSION}" - echo "Pushing apachepulsar/$image:${PULSAR_VERSION}" - docker push "apachepulsar/$image:${PULSAR_VERSION}" -} -done +CANDIDATE_TAG=3.x.x-80fb390 +regctl image copy ${OTHER_DOCKER_USER}/pulsar:${CANDIDATE_TAG} apachepulsar/pulsar:${PULSAR_VERSION} +regctl image copy ${OTHER_DOCKER_USER}/pulsar-all:${CANDIDATE_TAG} apachepulsar/pulsar-all:${PULSAR_VERSION} ``` +If this release is a feature release or a patch release of the last feature release, you should also push these images to the `latest` tag. + If you don't have the permission, you can ask someone with access to apachepulsar org to do that. ### Update project version
(pulsar-site) branch add-push-image created (now 8fbe1bce6d07)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch add-push-image in repository https://gitbox.apache.org/repos/asf/pulsar-site.git at 8fbe1bce6d07 Remove statement for the `lts` tag which has been proposed thorugh the PIP This branch includes the following new commits: new 8fbe1bce6d07 Remove statement for the `lts` tag which has been proposed thorugh the PIP The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(pulsar-site) 01/01: Remove statement for the `lts` tag which has been proposed thorugh the PIP
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch add-push-image in repository https://gitbox.apache.org/repos/asf/pulsar-site.git commit 8fbe1bce6d078e5d09441d635e99741fc5ec8d65 Author: Zike Yang AuthorDate: Tue Feb 20 18:41:44 2024 +0800 Remove statement for the `lts` tag which has been proposed thorugh the PIP --- contribute/release-process.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/contribute/release-process.md b/contribute/release-process.md index d1b45ac86ae3..cc8ad9fb8b21 100644 --- a/contribute/release-process.md +++ b/contribute/release-process.md @@ -334,8 +334,6 @@ regctl image copy ${OTHER_DOCKER_USER}/pulsar-all:${CANDIDATE_TAG} apachepulsar/ If this release is a feature release or a patch release of the last feature release, you should also push these images to the `latest` tag. -If this release is a LTS release or a patch release of the last LTS release, you should also push these images to the `lts` tag. - If you don't have the permission, you can ask someone with access to apachepulsar org to do that. ### Update project version
(pulsar-client-go) branch master updated: [Fix] Fix Bytes Schema (#1173)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new c2ca7e81 [Fix] Fix Bytes Schema (#1173) c2ca7e81 is described below commit c2ca7e81f0c609cd8fb7b13695664519e63e4501 Author: Peter Hull <56369394+petermnh...@users.noreply.github.com> AuthorDate: Tue Feb 20 02:31:00 2024 + [Fix] Fix Bytes Schema (#1173) --- pulsar/schema.go | 2 ++ pulsar/schema_test.go | 64 +++ pulsar/table_view_test.go | 7 ++ 3 files changed, 73 insertions(+) diff --git a/pulsar/schema.go b/pulsar/schema.go index fd9d412d..3427fb26 100644 --- a/pulsar/schema.go +++ b/pulsar/schema.go @@ -93,6 +93,8 @@ func NewSchema(schemaType SchemaType, schemaData []byte, properties map[string]s var schemaDef = string(schemaData) var s Schema switch schemaType { + case BYTES: + s = NewBytesSchema(properties) case STRING: s = NewStringSchema(properties) case JSON: diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go index c2008f6d..34216c47 100644 --- a/pulsar/schema_test.go +++ b/pulsar/schema_test.go @@ -19,11 +19,14 @@ package pulsar import ( "context" + "fmt" "log" "testing" + "time" pb "github.com/apache/pulsar-client-go/integration-tests/pb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type testJSON struct { @@ -55,6 +58,67 @@ func createClient() Client { return client } +func TestBytesSchema(t *testing.T) { + client := createClient() + defer client.Close() + + topic := newTopicName() + + properties := make(map[string]string) + properties["pulsar"] = "hello" + producerSchemaBytes := NewBytesSchema(properties) + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: producerSchemaBytes, + }) + assert.NoError(t, err) + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`{"key": "value"}`), + }) + require.NoError(t, err) + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: []byte(`something else`), + }) + require.NoError(t, err) + producer.Close() + + // Create consumer + consumerSchemaBytes := NewBytesSchema(nil) + assert.NotNil(t, consumerSchemaBytes) + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName:"sub-1", + Schema: consumerSchemaBytes, + SubscriptionInitialPosition: SubscriptionPositionEarliest, + }) + assert.Nil(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // Receive first message + var out1 []byte + msg1, err := consumer.Receive(ctx) + assert.NoError(t, err) + err = msg1.GetSchemaValue(&out1) + assert.NoError(t, err) + assert.Equal(t, []byte(`{"key": "value"}`), out1) + consumer.Ack(msg1) + require.NoError(t, err) + + // Receive second message + var out2 []byte + msg2, err := consumer.Receive(ctx) + fmt.Println(string(msg2.Payload())) + assert.NoError(t, err) + err = msg2.GetSchemaValue(&out2) + assert.NoError(t, err) + assert.Equal(t, []byte(`something else`), out2) + + defer consumer.Close() +} + func TestJsonSchema(t *testing.T) { client := createClient() defer client.Close() diff --git a/pulsar/table_view_test.go b/pulsar/table_view_test.go index 45b94411..2368e3d8 100644 --- a/pulsar/table_view_test.go +++ b/pulsar/table_view_test.go @@ -90,6 +90,13 @@ func TestTableViewSchemas(t *testing.T) { expValueOut interface{} valueCheckfunc(t *testing.T, got interface{}) // Overrides expValueOut for more complex checks }{ + { + name: "BytesSchema", + schema:NewBytesSchema(nil), + schemaType:[]byte(`any`), + producerValue: []byte(`hello pulsar`), + expValueOut: []byte(`hello pulsar`), + }, { name: "StringSchema", schema:NewStringSchema(nil),
(pulsar-client-go) branch merlimat-patch-1 deleted (was b48c59a1)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch merlimat-patch-1 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git was b48c59a1 Update codeql.yml The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(pulsar-client-go) branch master updated: Added CodeQL static code scanner (#1169)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new f476814a Added CodeQL static code scanner (#1169) f476814a is described below commit f476814a9e1bac484e9589d4fb3f299066f54ddc Author: Matteo Merli AuthorDate: Tue Feb 6 19:24:55 2024 -0800 Added CodeQL static code scanner (#1169) * Added CodeQL static code scanner * Update codeql.yml --- .github/workflows/codeql.yml | 90 1 file changed, 90 insertions(+) diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index ..869a19ad --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "CodeQL" + +on: + push: +branches: [ "master" ] + pull_request: +branches: [ "master" ] + schedule: +- cron: '43 13 * * 0' + +jobs: + analyze: +name: Analyze +# Runner size impacts CodeQL analysis time. To learn more, please see: +# - https://gh.io/recommended-hardware-resources-for-running-codeql +# - https://gh.io/supported-runners-and-hardware-resources +# - https://gh.io/using-larger-runners +# Consider using larger runners for possible analysis time improvements. +runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} +timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }} +permissions: + # required for all workflows + security-events: write + + # only required for workflows in private repositories + actions: read + contents: read + +strategy: + fail-fast: false + matrix: +language: [ 'go' ] +# CodeQL supports [ 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'swift' ] +# Use only 'java-kotlin' to analyze code written in Java, Kotlin or both +# Use only 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both +# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support + +steps: +- name: Checkout repository + uses: actions/checkout@v4 + +# Initializes the CodeQL tools for scanning. +- name: Initialize CodeQL + uses: github/codeql-action/init@v3 + with: +languages: ${{ matrix.language }} +# If you wish to specify custom queries, you can do so here or in a config file. +# By default, queries listed here will override any specified in a config file. +# Prefix the list here with "+" to use these queries and those in the config file. + +# For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs +# queries: security-extended,security-and-quality + + +# Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift). +# If this step fails, then you should remove it and run the build manually (see below) +- name: Autobuild + uses: github/codeql-action/autobuild@v3 + +# ℹ️ Command-line programs to run using the OS shell. +# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + +# If the Autobuild fails above, remove it and uncomment the following three lines. +# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + +# - run: | +# echo "Run, Build Application using script" +# ./location_of_script_within_repo/buildscript.sh + +- name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v3 + with: +category: "/language:${{matrix.language}}"
(pulsar-client-go) branch branch-0.12.0 updated: Fix SIGSEGV with zstd compression enabled (#1164)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.12.0 by this push: new 0f0d5a86 Fix SIGSEGV with zstd compression enabled (#1164) 0f0d5a86 is described below commit 0f0d5a86919495b846f6201d9df7b157c69c15b5 Author: Zike Yang AuthorDate: Fri Feb 2 01:24:50 2024 +0800 Fix SIGSEGV with zstd compression enabled (#1164) * Fix SIGSEGV with zstd compression enabled * Use sync.Pool to cache zstd ctx * Fix race in sequenceID assignment * Fix GetAndAdd (cherry picked from commit 877613503b70c4ee67a8408f31881d88fc086456) --- pulsar/internal/compression/zstd_cgo.go | 16 pulsar/internal/utils.go| 2 +- pulsar/producer_test.go | 28 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go index 25429e25..dde54ae2 100644 --- a/pulsar/internal/compression/zstd_cgo.go +++ b/pulsar/internal/compression/zstd_cgo.go @@ -25,19 +25,23 @@ package compression import ( + "sync" + "github.com/DataDog/zstd" log "github.com/sirupsen/logrus" ) type zstdCGoProvider struct { - ctx zstd.Ctx + ctxPool sync.Pool level Level zstdLevel int } func newCGoZStdProvider(level Level) Provider { z := &zstdCGoProvider{ - ctx: zstd.NewCtx(), + ctxPool: sync.Pool{New: func() any { + return zstd.NewCtx() + }}, } switch level { @@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int { } func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { - out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel) + ctx := z.ctxPool.Get().(zstd.Ctx) + defer z.ctxPool.Put(ctx) + out, err := ctx.CompressLevel(dst, src, z.zstdLevel) if err != nil { log.WithError(err).Fatal("Failed to compress") } @@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte { } func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) { - return z.ctx.Decompress(dst, src) + ctx := z.ctxPool.Get().(zstd.Ctx) + defer z.ctxPool.Put(ctx) + return ctx.Decompress(dst, src) } func (z *zstdCGoProvider) Close() error { diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go index 9378d9dc..2dc82101 100644 --- a/pulsar/internal/utils.go +++ b/pulsar/internal/utils.go @@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 { // GetAndAdd perform atomic read and update func GetAndAdd(n *uint64, diff uint64) uint64 { for { - v := *n + v := atomic.LoadUint64(n) if atomic.CompareAndSwapUint64(n, v, v+diff) { return v } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0d74cdee..3b9ea7e8 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) { assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) } +func TestSendConcurrently(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic:newTopicName(), + CompressionType: ZSTD, + CompressionLevel: Better, + DisableBatching: true, + }) + assert.NoError(t, err) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + _, err := testProducer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, 100), + }) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() +} + type pendingQueueWrapper struct { pendingQueue internal.BlockingQueue writtenBuffers *[]internal.Buffer
(pulsar-client-go) branch master updated: [Producer] respect context cancellation in Flush (#1165)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 2a28e21c [Producer] respect context cancellation in Flush (#1165) 2a28e21c is described below commit 2a28e21c59d005515e118fed5bf8f333d6699e39 Author: Jayant AuthorDate: Thu Feb 1 21:52:52 2024 -0500 [Producer] respect context cancellation in Flush (#1165) ### Motivation The producer's `Flush` method does not respect context cancellation. If the caller's context get's cancelled, it will have to wait for the producer to finish flushing. ### Modifications This change adds a `FlushWithCtx` method which takes a context and selects on two channels. --- pulsar/consumer_test.go| 6 +++--- .../pulsartracing/producer_interceptor_test.go | 4 pulsar/producer.go | 7 +-- pulsar/producer_impl.go| 6 +- pulsar/producer_partition.go | 18 +--- pulsar/producer_test.go| 24 +++--- pulsar/reader_test.go | 6 +++--- 7 files changed, 47 insertions(+), 24 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index df70b0dd..d66e2376 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -983,7 +983,7 @@ func TestConsumerBatchCumulativeAck(t *testing.T) { } wg.Wait() - err = producer.Flush() + err = producer.FlushWithCtx(context.Background()) assert.NoError(t, err) // send another batch @@ -1218,7 +1218,7 @@ func TestConsumerCompressionWithBatches(t *testing.T) { }, nil) } - producer.Flush() + producer.FlushWithCtx(context.Background()) for i := 0; i < N; i++ { msg, err := consumer.Receive(ctx) @@ -3932,7 +3932,7 @@ func runBatchIndexAckTest(t *testing.T, ackWithResponse bool, cumulative bool, o log.Printf("Sent to %v:%d:%d", id, id.BatchIdx(), id.BatchSize()) }) } - assert.Nil(t, producer.Flush()) + assert.Nil(t, producer.FlushWithCtx(context.Background())) msgIds := make([]MessageID, BatchingMaxSize) for i := 0; i < BatchingMaxSize; i++ { diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 8d8e6965..1c2c712f 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -67,4 +67,8 @@ func (p *mockProducer) Flush() error { return nil } +func (p *mockProducer) FlushWithCtx(ctx context.Context) error { + return nil +} + func (p *mockProducer) Close() {} diff --git a/pulsar/producer.go b/pulsar/producer.go index 70d152c7..f8013a16 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -237,10 +237,13 @@ type Producer interface { // return the last sequence id published by this producer. LastSequenceID() int64 - // Flush all the messages buffered in the client and wait until all messages have been successfully - // persisted. + // Deprecated: Use `FlushWithCtx()` instead. Flush() error + // Flush all the messages buffered in the client and wait until all messageshave been successfully + // persisted. + FlushWithCtx(ctx context.Context) error + // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 3c45b597..ca923108 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -334,11 +334,15 @@ func (p *producer) LastSequenceID() int64 { } func (p *producer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *producer) FlushWithCtx(ctx context.Context) error { p.RLock() defer p.RUnlock() for _, pp := range p.producers { - if err := pp.Flush(); err != nil { + if err := pp.FlushWithCtx(ctx); err != nil { return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 1b79053e..fbcc5b97 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1422,15 +1422,27 @@ func (p *partitionProducer) LastSequenceID() int64 { } func (p *partitionProducer) Flush() error { + return p.FlushWithCtx(context.Background()) +} + +func (p *partitionProducer) FlushWit
(pulsar-client-go) branch master updated (b0487429 -> 1ebc162a)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from b0487429 Fix SIGSEGV with zstd compression enabled add 1ebc162a Revert "Fix SIGSEGV with zstd compression enabled" No new revisions were added by this update. Summary of changes: pulsar/internal/compression/zstd_cgo.go | 5 - pulsar/producer_test.go | 28 2 files changed, 33 deletions(-)
(pulsar-client-go) branch master updated (5768f009 -> b0487429)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git from 5768f009 Add 0.12.0 change log (#1153) add b0487429 Fix SIGSEGV with zstd compression enabled No new revisions were added by this update. Summary of changes: pulsar/internal/compression/zstd_cgo.go | 5 + pulsar/producer_test.go | 28 2 files changed, 33 insertions(+)
(pulsar-client-go) branch master updated: Add 0.12.0 change log (#1153)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/master by this push: new 5768f009 Add 0.12.0 change log (#1153) 5768f009 is described below commit 5768f009c3a28cf58453acf7f67dcdeb6c7ff5d3 Author: Zike Yang AuthorDate: Mon Jan 29 09:39:10 2024 +0800 Add 0.12.0 change log (#1153) * Add 0.12.0 change log --- CHANGELOG.md | 79 VERSION | 2 +- stable.txt | 2 +- 3 files changed, 81 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7afe2c3..7a4c81cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,85 @@ All notable changes to this project will be documented in this file. +[0.12.0] 2024-01-10 + +## What's Changed +* Improved the performance of schema and schema cache by @gunli in https://github.com/apache/pulsar-client-go/pull/1033 +* Fixed return when registerSendOrAckOp() failed by @gunli in https://github.com/apache/pulsar-client-go/pull/1045 +* Fixed the incorrect link in the release process by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1050 +* Fixed Producer by checking if message is nil by @gunli in https://github.com/apache/pulsar-client-go/pull/1047 +* Added 0.11.0 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1048 +* Fixed 0.11.0 change log by @RobertIndie in https://github.com/apache/pulsar-client-go/pull/1054 +* Fixed issue 877 where ctx in partitionProducer.Send() was not performing as expected by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1053 +* Fixed Producer by stopping block request even if Value and Payload are both set by @gunli in https://github.com/apache/pulsar-client-go/pull/1052 +* Improved Producer by simplifying the flush logic by @gunli in https://github.com/apache/pulsar-client-go/pull/1049 +* Fixed issue 1051: inaccurate producer memory limit in chunking and schema by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1055 +* Fixed issue by sending Close Command on Producer/Consumer create timeout by @michaeljmarshall in https://github.com/apache/pulsar-client-go/pull/1061 +* Fixed issue 1057: producer flush operation is not guaranteed to flush all messages by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1058 +* Fixed issue 1064: panic when trying to flush in DisableBatching=true by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1065 +* Fixed transaction acknowledgement and send logic for chunk message by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1069 +* Fixed issue by closing consumer resources if creation fails by @michaeljmarshall in https://github.com/apache/pulsar-client-go/pull/1070 +* Fixed issue where client reconnected every authenticationRefreshCheckSeconds when using TLS authentication by @jffp113 in https://github.com/apache/pulsar-client-go/pull/1062 +* Corrected the SendAsync() description by @Gleiphir2769 in https://github.com/apache/pulsar-client-go/pull/1066 +* CI: replaced license header checker and formatter by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1077 +* Chore: allowed rebase and merge by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1080 +* Adopted pulsar-admin-go sources by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1079 +* Reverted: allowed rebase and merge by @tisonkun in https://github.com/apache/pulsar-client-go/pull/1081 +* Fixed producer by failing all messages that are pending requests when closing like Java by @graysonzeng in https://github.com/apache/pulsar-client-go/pull/1059 +* Supported load config from env by @tuteng in https://github.com/apache/pulsar-client-go/pull/1089 +* Fixed issue where multiple calls to client.Close causes panic by @crossoverJie in https://github.com/apache/pulsar-client-go/pull/1046 +* Improved client by implementing GetLastMSgID for Reader by @liangyepianzhou in https://github.com/apache/pulsar-client-go/pull/1087 +* Fixed comment for ConnectionMaxIdleTime by @massakam in https://github.com/apache/pulsar-client-go/pull/1091 +* Issue 1094: connectionTimeout respects net.Dialer default timeout by @zzzming in https://github.com/apache/pulsar-client-go/pull/1095 +* Supported OAuth2 with scope field by @labuladong in https://github.com/apache/pulsar-client-go/pull/1097 +* Fixed issue where DisableReplication flag does not work by @massakam in https://github.com/apache/pulsar-client-go/pull/1100 +* Double-checked before consumer reconnect by @zccold in https://github.com/apache/pulsar-client-go/pull/1084 +* Fixed schema error by @leizhiyuan in https://github.com/apache/pulsar-client-go/pull/823 +* PR-1071-1: renamed pendingItem.Complete() to pendingItem.done() by @gun
(pulsar-client-go) annotated tag v0.12.0 updated (853c108e -> 56452072)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.12.0 was modified! *** from 853c108e (commit) to 56452072 (tag) tagging 853c108eec6fa10bda801f945ee258d8615bf8b9 (commit) replaces v0.12.0-candidate-1 by Zike Yang on Mon Jan 29 01:35:22 2024 + - Log - Release v0.12.0 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmW3ANoQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VXqVD/4rhi3fbfEM4G3FQs+BhkdweO/cudKrU8tz XJ/0LXP9hYLRs/Q8AT/Y2LWA/GSbEfL2cNiMJpqhyl6MLFIHJISI8SDBDcYu+TlK Q/ITv5aktavrOGaEnwK1A1FwprEjiDyoSHkGZuwIFiITfsHKQKXx9rgqX/0d//vL 2fB3JJeXzOJq7F09g4g5yNxN+plAxAfol4uSAgcM5Fkvj/K4irz3TzakpK+56sdu io0kHOlLH+JSUHINJ4+7GGG+az1814cUgFI5Jj95kTODGtwkBAIGOVnQcFH6+OXr Ge9cBy8SN7e22AjMNM2L59VSy90vHG0hoKQemUH1ruG+TkxR6CNKpGBm0fZLk5h3 i4C6gFBcf9BEoOzckQS6T4tUYF79z/lGvy6Sk8f2pr39C0vtDqTzCFbUxbuIgu6N 83Jhv+r1FdWHD9xPVuKIH6jHRuYSDiX1zRpTQp5zMZBQhr8ioClrhx3sPjOJMtrf 4cDXXqJZtYCDIL/3Z1OIdgXjEGDUHv5x0QAQAaelj50IWXrAPrXIPLZtZH2Kt9jQ v1JguAUcNn4zedk+JITP4t/zPbSxxk8v/dcDBgAroDWKsJq0teDe1FHYfeJr7vK0 IKWWvUvhdrDCFmuifXKVPc0y9/SB/OYHkRt0o5P6hN2YHvgLX5fH0QHQDuFM6yJP DbFOwx1BQA== =e3Uh -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r66732 - in /dev/pulsar/pulsar-client-go-0.12.0-candidate-3: ./ apache-pulsar-client-go-0.12.0-src.tar.gz apache-pulsar-client-go-0.12.0-src.tar.gz.asc apache-pulsar-client-go-0.12.0-src.t
Author: zike Date: Mon Jan 22 07:31:42 2024 New Revision: 66732 Log: Staging artifacts and signature for Pulsar Client Go release 0.12.0-candidate-3 Added: dev/pulsar/pulsar-client-go-0.12.0-candidate-3/ dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz (with props) dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.asc dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.sha512 Added: dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz == Binary file - no diff available. Propchange: dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.asc == --- dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.asc (added) +++ dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.asc Mon Jan 22 07:31:42 2024 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCgAdFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmWuGGYACgkQT0AbyNP5 ++1UE6g/6AqLP1Vb/5xP0xRKnhhJ+uMu+b2czMhaWwfklHVo1T9XtjSOnnhT/xmBo +y9wP94Kk7O4HFLUF7S13PvpQQRCbbJuL1ojA96wr/w358XUXmQeUuqsZXrvrzJ96 +PtNyqQ9UCcftPFsyIHsCwbcevFSIqk4hM8G/cUWbW7HEmdmZrFQRpw4PIwjweE5S +f984f4KbDO01emOddt2BSA/EcCnA8NwETWWs3G1HLqmEeYWb10EuhY4ZtuKYZcGY +gNeQ11jstt8/X8bX/f3GN40eT2OS8GVWDQ7yFJTBIurCsZqUQHtedVVLVT2MSH2d +4sfnLu/Wd6Lh2l71Idw1TMqVlw3A1MLu6TQTWwEvfleLBJEDvXPztW/vjlaJPv2g +aNCoapfOilVUXCcrS2INKOmteyc28OWndSKqd0jdxI7PKJZEvAjOXtKwuBg92eZH +EzQLudtJINxQoZBnXbkQhKzQbGq12eE/iTrLmhShKPri/wro7EzhuitROiIV9Cvz +Arp6hYyqw/ePVwaZoGabFvgHlRiIKE+va5fh15WYzHi+OE8tNzkYt3zVajrL0EjO +yRyOqhP3S2WeAsEwIa/7DpGlACFl73wrIgRIzNU+VDQhNH7YOEzr/wak1Jzo+FHv +yoiPkZeWyyOWbWxl7qksCCcouYB9Abi+1gSPzxMDW1oU8GfgFG0= +=vw1F +-END PGP SIGNATURE- Added: dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.sha512 == --- dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.sha512 (added) +++ dev/pulsar/pulsar-client-go-0.12.0-candidate-3/apache-pulsar-client-go-0.12.0-src.tar.gz.sha512 Mon Jan 22 07:31:42 2024 @@ -0,0 +1 @@ +66c37c4439549a02d7e3d21ecc02c4a2f44bad2232fbf15fb04c42f02fc4caad758b7a1b1663d0df47414af99daa4e3a19d33c29c0decbc43ac4d17e46a126bc apache-pulsar-client-go-0.12.0-src.tar.gz
(pulsar-client-go) annotated tag v0.12.0-candidate-3 updated (853c108e -> cb59cad7)
This is an automated email from the ASF dual-hosted git repository. zike pushed a change to annotated tag v0.12.0-candidate-3 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git *** WARNING: tag v0.12.0-candidate-3 was modified! *** from 853c108e (commit) to cb59cad7 (tag) tagging 853c108eec6fa10bda801f945ee258d8615bf8b9 (commit) replaces v0.12.0-candidate-1 by Zike Yang on Mon Jan 22 07:23:49 2024 + - Log - Release v0.12.0-candidate-3 -BEGIN PGP SIGNATURE- iQJEBAABCgAuFiEE6ItqSN52cCOQDJEjT0AbyNP5+1UFAmWuGAUQHHppa2VAYXBh Y2hlLm9yZwAKCRBPQBvI0/n7VQoVD/9DebVhs4kAIs5bKzRsJd3CPcWQZS2zXdLW HbJWsZ0CHDz7lWkZX3VJiCun4hSn+lcpeNxcmqkf/AeXaVzN/M6Qwl9uvC473UOr FTs7caX1mpuvqyHXm/+h2XVv2saDKjt+Eu8BJ9FNAD286NC2weUKiCvyZmedBkd8 sfqj7yaUjdIyCLLkUtYKHii/kma99BTrO1Gudbx34FW72PS5SkR74TVOAexrxmPH AuY7urh8L69uMv5KMJOvOrajg69z+9YfJ1VEo1Qft0z4CdCplLvUbX1YQtsuVxhn ZREOqwOGgmMkDxU6H117kJV4JrkibUpa73I+AjnpPmJ3LXH6oFNQbSIargDCFrfd efTrqOgywASCTvl/712F2FCRocrSiKj68zwt9kCbZ7tUXTrxb9ZL5cobpwlbliWm kvvEsCg7rk3SBrrPvaIDAe84wG3mIFiVyMykUaw4t2Xo4E/8jCFoBDsKv3lKEMix ODYwyXMvAVFIt2KOHAq01roXGg6tCt9pX/vllScFNj1E461FlRmN11So2GFdpwj9 rvyivsKqDvZVu9hvp3kfgWf87Wlh0STjXHFq5Zdx+v0B3rQoxEpiqgnHc3/aKlXY n/rhAEd9yQIEhNj+DGCW5j2yc0qyBBnJ+Y9OXk7+gx2IcGn41sHDH3MLcrUldurY TFj++N+y+Q== =5cHf -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
(pulsar-client-go) branch branch-0.12.0 updated: Release 0.12.0
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.12.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git The following commit(s) were added to refs/heads/branch-0.12.0 by this push: new 853c108e Release 0.12.0 853c108e is described below commit 853c108eec6fa10bda801f945ee258d8615bf8b9 Author: Zike Yang AuthorDate: Mon Jan 22 07:19:23 2024 + Release 0.12.0 Signed-off-by: Zike Yang --- VERSION| 2 +- stable.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/VERSION b/VERSION index dbf0637b..725659e7 100644 --- a/VERSION +++ b/VERSION @@ -1,3 +1,3 @@ // This version number refers to the currently released version number // Please fix the version when release. -v0.11.1 +v0.12.0 diff --git a/stable.txt b/stable.txt index 7e38472c..e0916329 100644 --- a/stable.txt +++ b/stable.txt @@ -1,3 +1,3 @@ // This version number refers to the current stable version, generally is `VERSION - 1`. // Please fix the version when release. -v0.11.1 +v0.12.0
(pulsar-client-cpp) branch main updated: Fix `StartMessageIdInclusive` not work when reader reads from latest msg id (#386)
This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git The following commit(s) were added to refs/heads/main by this push: new a1cf401 Fix `StartMessageIdInclusive` not work when reader reads from latest msg id (#386) a1cf401 is described below commit a1cf401ec0dd423871537559050396a2b001065d Author: Zike Yang AuthorDate: Mon Jan 15 15:11:23 2024 +0800 Fix `StartMessageIdInclusive` not work when reader reads from latest msg id (#386) Fixes #385 ### Motivation The reader with `StartMessageIdInclusive` enabled should be able to reads messages from the latest message ID. ### Modifications - If `StartMessageIdInclusive` is enabled, the reader will seek and read the latest message in the topic. --- lib/ConsumerImpl.cc | 30 +++--- tests/ReaderTest.cc | 32 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 5216218..dbd3b65 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -1508,18 +1508,34 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback if (messageId == MessageId::latest()) { lock.unlock(); -getLastMessageIdAsync([callback](Result result, const GetLastMessageIdResponse& response) { +auto self = get_shared_this_ptr(); +getLastMessageIdAsync([self, callback](Result result, const GetLastMessageIdResponse& response) { if (result != ResultOk) { callback(result, {}); return; } -if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { -// We only care about comparing ledger ids and entry ids as mark delete position doesn't have -// other ids such as batch index -callback(ResultOk, compareLedgerAndEntryId(response.getMarkDeletePosition(), - response.getLastMessageId()) < 0); +auto handleResponse = [self, response, callback] { +if (response.hasMarkDeletePosition() && response.getLastMessageId().entryId() >= 0) { +// We only care about comparing ledger ids and entry ids as mark delete position doesn't +// have other ids such as batch index +auto compareResult = compareLedgerAndEntryId(response.getMarkDeletePosition(), + response.getLastMessageId()); +callback(ResultOk, self->config_.isStartMessageIdInclusive() ? compareResult <= 0 + : compareResult < 0); +} else { +callback(ResultOk, false); +} +}; +if (self->config_.isStartMessageIdInclusive()) { +self->seekAsync(response.getLastMessageId(), [callback, handleResponse](Result result) { +if (result != ResultOk) { +callback(result, {}); +return; +} +handleResponse(); +}); } else { -callback(ResultOk, false); +handleResponse(); } }); } else { diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index ac2fa23..723972d 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -752,4 +752,36 @@ TEST(ReaderSeekTest, testSeekForMessageId) { producer.close(); } +TEST(ReaderSeekTest, testStartAtLatestMessageId) { +Client client(serviceUrl); + +const std::string topic = "test-seek-latest-message-id-" + std::to_string(time(nullptr)); + +Producer producer; +ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + +MessageId id; +ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("msg").build(), id)); + +Reader readerExclusive; +ASSERT_EQ(ResultOk, + client.createReader(topic, MessageId::latest(), ReaderConfiguration(), readerExclusive)); + +Reader readerInclusive; +ASSERT_EQ(ResultOk, + client.createReader(topic, MessageId::latest(), + ReaderConfiguration().setStartMessageIdInclusive(true), readerInclusive)); + +Message msg; +bool hasMsgAvaliable = false; +readerInclusive.hasMessageAvailable(hasMsgAvaliable); +ASSERT_TRUE(hasMsgAvaliable); +ASSERT_EQ(ResultOk, readerInclusive.readNext(msg, 3000)); +ASSERT_EQ(ResultTimeout, readerExclusive.readNext(msg, 3000)); + +readerExclusive.close(); +readerInclusive.clo